You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2021/04/26 10:28:23 UTC

[rocketmq] branch develop updated: [ISSUE #2833] Support trace for TranscationProducer (#2834)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c3d4641  [ISSUE #2833] Support trace for TranscationProducer (#2834)
c3d4641 is described below

commit c3d464108e7c099d3438debbab75e86ffd5f036c
Author: yuz10 <84...@qq.com>
AuthorDate: Mon Apr 26 18:28:10 2021 +0800

    [ISSUE #2833] Support trace for TranscationProducer (#2834)
---
 .../client/hook/EndTransactionContext.java         |  86 +++++++++
 .../EndTransactionHook.java}                       |  10 +-
 .../impl/producer/DefaultMQProducerImpl.java       |  43 ++++-
 .../client/producer/DefaultMQProducer.java         |  13 +-
 .../client/producer/TransactionMQProducer.java     |   4 +
 .../apache/rocketmq/client/trace/TraceBean.java    |  29 ++-
 .../rocketmq/client/trace/TraceDataEncoder.java    |  41 ++++-
 .../apache/rocketmq/client/trace/TraceType.java    |   1 +
 .../trace/hook/EndTransactionTraceHookImpl.java    |  81 ++++++++
 .../client/trace/TraceDataEncoderTest.java         |  45 ++++-
 .../trace/TransactionMQProducerWithTraceTest.java  | 203 +++++++++++++++++++++
 11 files changed, 543 insertions(+), 13 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionContext.java
new file mode 100644
index 0000000..5271ade
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionContext.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.common.message.Message;
+
+public class EndTransactionContext {
+    private String producerGroup;
+    private Message message;
+    private String brokerAddr;
+    private String msgId;
+    private String transactionId;
+    private LocalTransactionState transactionState;
+    private boolean fromTransactionCheck;
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(String transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public LocalTransactionState getTransactionState() {
+        return transactionState;
+    }
+
+    public void setTransactionState(LocalTransactionState transactionState) {
+        this.transactionState = transactionState;
+    }
+
+    public boolean isFromTransactionCheck() {
+        return fromTransactionCheck;
+    }
+
+    public void setFromTransactionCheck(boolean fromTransactionCheck) {
+        this.fromTransactionCheck = fromTransactionCheck;
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionHook.java
similarity index 82%
copy from client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
copy to client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionHook.java
index 79b19c1..834cb27 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionHook.java
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.trace;
+package org.apache.rocketmq.client.hook;
 
-public enum TraceType {
-    Pub,
-    SubBefore,
-    SubAfter,
+public interface EndTransactionHook {
+    String hookName();
+
+    void endTransaction(final EndTransactionContext context);
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 2f9146d..fac3ed3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -44,6 +44,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
 import org.apache.rocketmq.client.hook.CheckForbiddenContext;
 import org.apache.rocketmq.client.hook.CheckForbiddenHook;
+import org.apache.rocketmq.client.hook.EndTransactionContext;
+import org.apache.rocketmq.client.hook.EndTransactionHook;
 import org.apache.rocketmq.client.hook.SendMessageContext;
 import org.apache.rocketmq.client.hook.SendMessageHook;
 import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -101,6 +103,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
         new ConcurrentHashMap<String, TopicPublishInfo>();
     private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
+    private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();
     private final RPCHook rpcHook;
     private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
     private final ExecutorService defaultAsyncSenderExecutor;
@@ -171,6 +174,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         log.info("register sendMessage Hook, {}", hook.hookName());
     }
 
+    public void registerEndTransactionHook(final EndTransactionHook hook) {
+        this.endTransactionHookList.add(hook);
+        log.info("register endTransaction Hook, {}", hook.hookName());
+    }
+
     public void start() throws MQClientException {
         this.start(true);
     }
@@ -386,6 +394,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 if (exception != null) {
                     remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
                 }
+                doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);
 
                 try {
                     DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
@@ -967,6 +976,36 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         }
     }
 
+    public boolean hasEndTransactionHook() {
+        return !this.endTransactionHookList.isEmpty();
+    }
+
+    public void executeEndTransactionHook(final EndTransactionContext context) {
+        if (!this.endTransactionHookList.isEmpty()) {
+            for (EndTransactionHook hook : this.endTransactionHookList) {
+                try {
+                    hook.endTransaction(context);
+                } catch (Throwable e) {
+                    log.warn("failed to executeEndTransactionHook", e);
+                }
+            }
+        }
+    }
+
+    public void doExecuteEndTransactionHook(Message msg, String msgId, String brokerAddr, LocalTransactionState state,
+        boolean fromTransactionCheck) {
+        if (hasEndTransactionHook()) {
+            EndTransactionContext context = new EndTransactionContext();
+            context.setProducerGroup(defaultMQProducer.getProducerGroup());
+            context.setBrokerAddr(brokerAddr);
+            context.setMessage(msg);
+            context.setMsgId(msgId);
+            context.setTransactionId(msg.getTransactionId());
+            context.setTransactionState(state);
+            context.setFromTransactionCheck(fromTransactionCheck);
+            executeEndTransactionHook(context);
+        }
+    }
     /**
      * DEFAULT ONEWAY -------------------------------------------------------
      */
@@ -1266,7 +1305,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         }
 
         try {
-            this.endTransaction(sendResult, localTransactionState, localException);
+            this.endTransaction(msg, sendResult, localTransactionState, localException);
         } catch (Exception e) {
             log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
         }
@@ -1290,6 +1329,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     public void endTransaction(
+        final Message msg,
         final SendResult sendResult,
         final LocalTransactionState localTransactionState,
         final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
@@ -1318,6 +1358,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 break;
         }
 
+        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
         requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
         requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
         requestHeader.setMsgId(sendResult.getMsgId());
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 24caf14..1c4a931 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
 import org.apache.rocketmq.client.trace.TraceDispatcher;
+import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl;
 import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.Message;
@@ -167,6 +168,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
                 traceDispatcher = dispatcher;
                 this.defaultMQProducerImpl.registerSendMessageHook(
                     new SendMessageTraceHookImpl(traceDispatcher));
+                this.defaultMQProducerImpl.registerEndTransactionHook(
+                    new EndTransactionTraceHookImpl(traceDispatcher));
             } catch (Throwable e) {
                 log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
             }
@@ -252,6 +255,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
                 traceDispatcher = dispatcher;
                 this.getDefaultMQProducerImpl().registerSendMessageHook(
                     new SendMessageTraceHookImpl(traceDispatcher));
+                this.defaultMQProducerImpl.registerEndTransactionHook(
+                    new EndTransactionTraceHookImpl(traceDispatcher));
             } catch (Throwable e) {
                 log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
             }
@@ -916,24 +921,24 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
         long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
     }
-    
+
     @Override
     public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
     }
-    
+
     @Override
     public void send(Collection<Message> msgs, SendCallback sendCallback,
         long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.defaultMQProducerImpl.send(batch(msgs), sendCallback, timeout);
     }
-    
+
     @Override
     public void send(Collection<Message> msgs, MessageQueue mq,
         SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback);
     }
-    
+
     @Override
     public void send(Collection<Message> msgs, MessageQueue mq,
         SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
index 63b512d..4eb758d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -51,6 +51,10 @@ public class TransactionMQProducer extends DefaultMQProducer {
         super(namespace, producerGroup, rpcHook);
     }
 
+    public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
+        super(namespace, producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic);
+    }
+
     @Override
     public void start() throws MQClientException {
         this.defaultMQProducerImpl.initTransactionEnv();
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
index f93aa38..70c147e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.trace;
 
+import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageType;
 
@@ -32,7 +33,9 @@ public class TraceBean {
     private int retryTimes;
     private int bodyLength;
     private MessageType msgType;
-
+    private LocalTransactionState transactionState;
+    private String transactionId;
+    private boolean fromTransactionCheck;
 
     public MessageType getMsgType() {
         return msgType;
@@ -141,4 +144,28 @@ public class TraceBean {
     public void setBodyLength(int bodyLength) {
         this.bodyLength = bodyLength;
     }
+
+    public LocalTransactionState getTransactionState() {
+        return transactionState;
+    }
+
+    public void setTransactionState(LocalTransactionState transactionState) {
+        this.transactionState = transactionState;
+    }
+
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(String transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public boolean isFromTransactionCheck() {
+        return fromTransactionCheck;
+    }
+
+    public void setFromTransactionCheck(boolean fromTransactionCheck) {
+        this.fromTransactionCheck = fromTransactionCheck;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index acf0dea..b2b0645 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.trace;
 
+import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.common.message.MessageType;
 
 import java.util.ArrayList;
@@ -109,6 +110,27 @@ public class TraceDataEncoder {
                     subAfterContext.setGroupName(line[8]);
                 }
                 resList.add(subAfterContext);
+            } else if (line[0].equals(TraceType.EndTransaction.name())) {
+                TraceContext endTransactionContext = new TraceContext();
+                endTransactionContext.setTraceType(TraceType.EndTransaction);
+                endTransactionContext.setTimeStamp(Long.parseLong(line[1]));
+                endTransactionContext.setRegionId(line[2]);
+                endTransactionContext.setGroupName(line[3]);
+                TraceBean bean = new TraceBean();
+                bean.setTopic(line[4]);
+                bean.setMsgId(line[5]);
+                bean.setTags(line[6]);
+                bean.setKeys(line[7]);
+                bean.setStoreHost(line[8]);
+                bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]);
+                bean.setClientHost(line[10]);
+                bean.setTransactionId(line[11]);
+                bean.setTransactionState(LocalTransactionState.valueOf(line[12]));
+                bean.setFromTransactionCheck(Boolean.parseBoolean(line[13]));
+
+                endTransactionContext.setTraceBeans(new ArrayList<TraceBean>(1));
+                endTransactionContext.getTraceBeans().add(bean);
+                resList.add(endTransactionContext);
             }
         }
         return resList;
@@ -173,9 +195,26 @@ public class TraceDataEncoder {
                         .append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR)
                         .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
                         .append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
-                    
+
                 }
             }
+            case EndTransaction: {
+                TraceBean bean = ctx.getTraceBeans().get(0);
+                sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR);
+            }
             break;
             default:
         }
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
index 79b19c1..8870ddc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
@@ -20,4 +20,5 @@ public enum TraceType {
     Pub,
     SubBefore,
     SubAfter,
+    EndTransaction,
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionTraceHookImpl.java
new file mode 100644
index 0000000..cbd755b
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionTraceHookImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.hook;
+
+import org.apache.rocketmq.client.hook.EndTransactionContext;
+import org.apache.rocketmq.client.hook.EndTransactionHook;
+import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.TraceBean;
+import org.apache.rocketmq.client.trace.TraceContext;
+import org.apache.rocketmq.client.trace.TraceDispatcher;
+import org.apache.rocketmq.client.trace.TraceType;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageType;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+
+import java.util.ArrayList;
+
+public class EndTransactionTraceHookImpl implements EndTransactionHook {
+
+    private TraceDispatcher localDispatcher;
+
+    public EndTransactionTraceHookImpl(TraceDispatcher localDispatcher) {
+        this.localDispatcher = localDispatcher;
+    }
+
+    @Override
+    public String hookName() {
+        return "EndTransactionTraceHook";
+    }
+
+    @Override
+    public void endTransaction(EndTransactionContext context) {
+        //if it is message trace data,then it doesn't recorded
+        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
+            return;
+        }
+        Message msg = context.getMessage();
+        //build the context content of TuxeTraceContext
+        TraceContext tuxeContext = new TraceContext();
+        tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
+        tuxeContext.setTraceType(TraceType.EndTransaction);
+        tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
+        //build the data bean object of message trace
+        TraceBean traceBean = new TraceBean();
+        traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
+        traceBean.setTags(context.getMessage().getTags());
+        traceBean.setKeys(context.getMessage().getKeys());
+        traceBean.setStoreHost(context.getBrokerAddr());
+        traceBean.setMsgType(MessageType.Trans_msg_Commit);
+        traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
+        traceBean.setMsgId(context.getMsgId());
+        traceBean.setTransactionState(context.getTransactionState());
+        traceBean.setTransactionId(context.getTransactionId());
+        traceBean.setFromTransactionCheck(context.isFromTransactionCheck());
+        String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
+        if (regionId == null || regionId.isEmpty()) {
+            regionId = MixAll.DEFAULT_TRACE_REGION_ID;
+        }
+        tuxeContext.setRegionId(regionId);
+        tuxeContext.getTraceBeans().add(traceBean);
+        tuxeContext.setTimeStamp(System.currentTimeMillis());
+        localDispatcher.append(tuxeContext);
+    }
+
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
index 249a0d1..bac12ea 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.trace;
 
+import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageType;
 import org.junit.Assert;
@@ -90,4 +91,46 @@ public class TraceDataEncoderTest {
         Assert.assertEquals(traceTransferBean.getTransKey().size(), 2);
     }
 
-}
\ No newline at end of file
+    @Test
+    public void testEncoderFromContextBean_EndTransaction() {
+        TraceContext context = new TraceContext();
+        context.setTraceType(TraceType.EndTransaction);
+        context.setGroupName("PID-test");
+        context.setRegionId("DefaultRegion");
+        context.setTimeStamp(time);
+        TraceBean traceBean = new TraceBean();
+        traceBean.setTopic("topic-test");
+        traceBean.setKeys("Keys");
+        traceBean.setTags("Tags");
+        traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
+        traceBean.setStoreHost("127.0.0.1:10911");
+        traceBean.setClientHost("127.0.0.1@41700");
+        traceBean.setMsgType(MessageType.Trans_msg_Commit);
+        traceBean.setTransactionId("transactionId");
+        traceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE);
+        traceBean.setFromTransactionCheck(false);
+        List<TraceBean> traceBeans = new ArrayList<TraceBean>();
+        traceBeans.add(traceBean);
+        context.setTraceBeans(traceBeans);
+        TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(context);
+
+        Assert.assertEquals(traceTransferBean.getTransKey().size(), 2);
+        String traceData = traceTransferBean.getTransData();
+        TraceContext contextAfter = TraceDataEncoder.decoderFromTraceDataString(traceData).get(0);
+        Assert.assertEquals(context.getTraceType(), contextAfter.getTraceType());
+        Assert.assertEquals(context.getTimeStamp(), contextAfter.getTimeStamp());
+        Assert.assertEquals(context.getGroupName(), contextAfter.getGroupName());
+        TraceBean before = context.getTraceBeans().get(0);
+        TraceBean after = contextAfter.getTraceBeans().get(0);
+        Assert.assertEquals(before.getTopic(), after.getTopic());
+        Assert.assertEquals(before.getMsgId(), after.getMsgId());
+        Assert.assertEquals(before.getTags(), after.getTags());
+        Assert.assertEquals(before.getKeys(), after.getKeys());
+        Assert.assertEquals(before.getStoreHost(), after.getStoreHost());
+        Assert.assertEquals(before.getMsgType(), after.getMsgType());
+        Assert.assertEquals(before.getClientHost(), after.getClientHost());
+        Assert.assertEquals(before.getTransactionId(), after.getTransactionId());
+        Assert.assertEquals(before.getTransactionState(), after.getTransactionState());
+        Assert.assertEquals(before.isFromTransactionCheck(), after.isFromTransactionCheck());
+    }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
new file mode 100644
index 0000000..f838817
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.EndTransactionContext;
+import org.apache.rocketmq.client.hook.EndTransactionHook;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TransactionMQProducerWithTraceTest {
+
+    @Spy
+    private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+    @Mock
+    private EndTransactionHook endTransactionHook;
+
+    private AsyncTraceDispatcher asyncTraceDispatcher;
+
+    private TransactionMQProducer producer;
+    private DefaultMQProducer traceProducer;
+
+    private Message message;
+    private String topic = "FooBar";
+    private String producerGroupPrefix = "FooBar_PID";
+    private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
+    private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
+    private String customerTraceTopic = "rmq_trace_topic_12345";
+
+    @Before
+    public void init() throws Exception {
+        TransactionListener transactionListener = new TransactionListener() {
+            @Override
+            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
+                return LocalTransactionState.COMMIT_MESSAGE;
+            }
+
+            @Override
+            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+                return LocalTransactionState.COMMIT_MESSAGE;
+            }
+        };
+        producer = new TransactionMQProducer(null, producerGroupTemp, null, true, null);
+        producer.setTransactionListener(transactionListener);
+
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        message = new Message(topic, new byte[] {'a', 'b', 'c'});
+        asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
+        traceProducer = asyncTraceDispatcher.getTraceProducer();
+
+        producer.start();
+
+        Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
+
+        Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+        fieldTrace.setAccessible(true);
+        fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
+
+        Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList");
+        fieldHooks.setAccessible(true);
+        List<EndTransactionHook>hooks = new ArrayList<>();
+        hooks.add(endTransactionHook);
+        fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks);
+
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenReturn(createSendResult(SendStatus.SEND_OK));
+
+    }
+
+    @Test
+    public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        AtomicReference<EndTransactionContext> context = new AtomicReference<>();
+        doAnswer(mock -> {
+            context.set(mock.getArgument(0));
+            return null;
+        }).when(endTransactionHook).endTransaction(any());
+        producer.sendMessageInTransaction(message, null);
+
+        EndTransactionContext ctx = context.get();
+        assertThat(ctx.getProducerGroup()).isEqualTo(producerGroupTemp);
+        assertThat(ctx.getMsgId()).isEqualTo("123");
+        assertThat(ctx.isFromTransactionCheck()).isFalse();
+        assertThat(new String(ctx.getMessage().getBody())).isEqualTo(new String(message.getBody()));
+        assertThat(ctx.getMessage().getTopic()).isEqualTo(topic);
+    }
+
+    @After
+    public void terminate() {
+        producer.shutdown();
+    }
+
+    public static TopicRouteData createTopicRoute() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("BrokerA");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+        brokerAddrs.put(0L, "127.0.0.1:10911");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("BrokerA");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSysFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+        return topicRouteData;
+    }
+
+    private SendResult createSendResult(SendStatus sendStatus) {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("123");
+        sendResult.setOffsetMsgId(MessageDecoder.createMessageId(new InetSocketAddress("127.0.0.1", 12), 1));
+        sendResult.setQueueOffset(456);
+        sendResult.setSendStatus(sendStatus);
+        sendResult.setRegionId("HZ");
+        sendResult.setMessageQueue(new MessageQueue(topic, "broker-trace", 0));
+        return sendResult;
+    }
+
+}