You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:50 UTC
[rocketmq] 09/26: [ISSUE #5406] support transaction message for remoting proxy
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 7e916e3db8421acad01b9fce4743b92d51a4b11d
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Wed Nov 9 16:58:01 2022 +0800
[ISSUE #5406] support transaction message for remoting proxy
---
.../proxy/remoting/RemotingProtocolServer.java | 5 ++
.../remoting/activity/SendMessageActivity.java | 8 +--
.../remoting/activity/TransactionActivity.java | 68 ++++++++++++++++++++++
3 files changed, 74 insertions(+), 7 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index 91c4422d2..d0137b2b4 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.proxy.remoting.activity.GetTopicRouteActivity;
import org.apache.rocketmq.proxy.remoting.activity.PopMessageActivity;
import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity;
import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity;
+import org.apache.rocketmq.proxy.remoting.activity.TransactionActivity;
import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -67,6 +68,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
protected final ClientManagerActivity clientManagerActivity;
protected final ConsumerManagerActivity consumerManagerActivity;
protected final SendMessageActivity sendMessageActivity;
+ protected final TransactionActivity transactionActivity;
protected final PullMessageActivity pullMessageActivity;
protected final PopMessageActivity popMessageActivity;
protected final AckMessageActivity ackMessageActivity;
@@ -88,6 +90,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager);
this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor);
this.sendMessageActivity = new SendMessageActivity(pipeline, messagingProcessor);
+ this.transactionActivity = new TransactionActivity(pipeline, messagingProcessor);
this.pullMessageActivity = new PullMessageActivity(pipeline, messagingProcessor);
this.popMessageActivity = new PopMessageActivity(pipeline, messagingProcessor);
this.ackMessageActivity = new AckMessageActivity(pipeline, messagingProcessor);
@@ -184,6 +187,8 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageActivity, this.sendMessageExecutor);
remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageActivity, sendMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.END_TRANSACTION, transactionActivity, sendMessageExecutor);
+
remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManagerActivity, this.heartbeatExecutor);
remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManagerActivity, this.defaultExecutor);
remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManagerActivity, this.defaultExecutor);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
index 904460431..20fab6e57 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
@@ -77,7 +77,7 @@ public class SendMessageActivity extends AbstractRemotingActivity {
}
if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
if (TopicMessageType.TRANSACTION.equals(messageType)) {
- return sendTransactionMessage(ctx, request, context);
+ messagingProcessor.addTransactionSubscription(context, requestHeader.getProducerGroup(), requestHeader.getTopic());
}
}
return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
@@ -87,10 +87,4 @@ public class SendMessageActivity extends AbstractRemotingActivity {
ProxyContext context) throws Exception {
return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
}
-
- protected RemotingCommand sendTransactionMessage(ChannelHandlerContext ctx, RemotingCommand request,
- ProxyContext context) throws Exception {
- // TODO: wait for connection implement.
- return null;
- }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java
new file mode 100644
index 000000000..24f98a875
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.processor.TransactionStatus;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class TransactionActivity extends AbstractRemotingActivity {
+
+ public TransactionActivity(RequestPipeline requestPipeline,
+ MessagingProcessor messagingProcessor) {
+ super(requestPipeline, messagingProcessor);
+ }
+
+ @Override
+ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
+
+ TransactionStatus transactionStatus = TransactionStatus.UNKNOWN;
+ switch (requestHeader.getCommitOrRollback()) {
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ transactionStatus = TransactionStatus.COMMIT;
+ break;
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ transactionStatus = TransactionStatus.ROLLBACK;
+ break;
+ default:
+ break;
+ }
+
+ this.messagingProcessor.endTransaction(
+ context,
+ requestHeader.getTransactionId(),
+ requestHeader.getMsgId(),
+ requestHeader.getProducerGroup(),
+ transactionStatus,
+ requestHeader.getFromTransactionCheck()
+ );
+ return response;
+ }
+}