You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:50 UTC

[rocketmq] 09/26: [ISSUE #5406] support transaction message for remoting proxy

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 7e916e3db8421acad01b9fce4743b92d51a4b11d
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Wed Nov 9 16:58:01 2022 +0800

    [ISSUE #5406] support transaction message for remoting proxy
---
 .../proxy/remoting/RemotingProtocolServer.java     |  5 ++
 .../remoting/activity/SendMessageActivity.java     |  8 +--
 .../remoting/activity/TransactionActivity.java     | 68 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 7 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index 91c4422d2..d0137b2b4 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.proxy.remoting.activity.GetTopicRouteActivity;
 import org.apache.rocketmq.proxy.remoting.activity.PopMessageActivity;
 import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity;
 import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity;
+import org.apache.rocketmq.proxy.remoting.activity.TransactionActivity;
 import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
 import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
 import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -67,6 +68,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
     protected final ClientManagerActivity clientManagerActivity;
     protected final ConsumerManagerActivity consumerManagerActivity;
     protected final SendMessageActivity sendMessageActivity;
+    protected final TransactionActivity transactionActivity;
     protected final PullMessageActivity pullMessageActivity;
     protected final PopMessageActivity popMessageActivity;
     protected final AckMessageActivity ackMessageActivity;
@@ -88,6 +90,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
         this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager);
         this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor);
         this.sendMessageActivity = new SendMessageActivity(pipeline, messagingProcessor);
+        this.transactionActivity = new TransactionActivity(pipeline, messagingProcessor);
         this.pullMessageActivity = new PullMessageActivity(pipeline, messagingProcessor);
         this.popMessageActivity = new PopMessageActivity(pipeline, messagingProcessor);
         this.ackMessageActivity = new AckMessageActivity(pipeline, messagingProcessor);
@@ -184,6 +187,8 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
         remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageActivity, this.sendMessageExecutor);
         remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageActivity, sendMessageExecutor);
 
+        remotingServer.registerProcessor(RequestCode.END_TRANSACTION, transactionActivity, sendMessageExecutor);
+
         remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManagerActivity, this.heartbeatExecutor);
         remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManagerActivity, this.defaultExecutor);
         remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManagerActivity, this.defaultExecutor);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
index 904460431..20fab6e57 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
@@ -77,7 +77,7 @@ public class SendMessageActivity extends AbstractRemotingActivity {
         }
         if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
             if (TopicMessageType.TRANSACTION.equals(messageType)) {
-                return sendTransactionMessage(ctx, request, context);
+                messagingProcessor.addTransactionSubscription(context, requestHeader.getProducerGroup(), requestHeader.getTopic());
             }
         }
         return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
@@ -87,10 +87,4 @@ public class SendMessageActivity extends AbstractRemotingActivity {
         ProxyContext context) throws Exception {
         return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
     }
-
-    protected RemotingCommand sendTransactionMessage(ChannelHandlerContext ctx, RemotingCommand request,
-        ProxyContext context) throws Exception {
-        // TODO: wait for connection implement.
-        return null;
-    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java
new file mode 100644
index 000000000..24f98a875
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.processor.TransactionStatus;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class TransactionActivity extends AbstractRemotingActivity {
+
+    public TransactionActivity(RequestPipeline requestPipeline,
+        MessagingProcessor messagingProcessor) {
+        super(requestPipeline, messagingProcessor);
+    }
+
+    @Override
+    protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+        ProxyContext context) throws Exception {
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
+
+        TransactionStatus transactionStatus = TransactionStatus.UNKNOWN;
+        switch (requestHeader.getCommitOrRollback()) {
+            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                transactionStatus = TransactionStatus.COMMIT;
+                break;
+            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                transactionStatus = TransactionStatus.ROLLBACK;
+                break;
+            default:
+                break;
+        }
+
+        this.messagingProcessor.endTransaction(
+            context,
+            requestHeader.getTransactionId(),
+            requestHeader.getMsgId(),
+            requestHeader.getProducerGroup(),
+            transactionStatus,
+            requestHeader.getFromTransactionCheck()
+        );
+        return response;
+    }
+}