You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2021/04/26 10:28:23 UTC
[rocketmq] branch develop updated: [ISSUE #2833] Support trace for
TranscationProducer (#2834)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c3d4641 [ISSUE #2833] Support trace for TranscationProducer (#2834)
c3d4641 is described below
commit c3d464108e7c099d3438debbab75e86ffd5f036c
Author: yuz10 <84...@qq.com>
AuthorDate: Mon Apr 26 18:28:10 2021 +0800
[ISSUE #2833] Support trace for TranscationProducer (#2834)
---
.../client/hook/EndTransactionContext.java | 86 +++++++++
.../EndTransactionHook.java} | 10 +-
.../impl/producer/DefaultMQProducerImpl.java | 43 ++++-
.../client/producer/DefaultMQProducer.java | 13 +-
.../client/producer/TransactionMQProducer.java | 4 +
.../apache/rocketmq/client/trace/TraceBean.java | 29 ++-
.../rocketmq/client/trace/TraceDataEncoder.java | 41 ++++-
.../apache/rocketmq/client/trace/TraceType.java | 1 +
.../trace/hook/EndTransactionTraceHookImpl.java | 81 ++++++++
.../client/trace/TraceDataEncoderTest.java | 45 ++++-
.../trace/TransactionMQProducerWithTraceTest.java | 203 +++++++++++++++++++++
11 files changed, 543 insertions(+), 13 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionContext.java
new file mode 100644
index 0000000..5271ade
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionContext.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.common.message.Message;
+
+public class EndTransactionContext {
+ private String producerGroup;
+ private Message message;
+ private String brokerAddr;
+ private String msgId;
+ private String transactionId;
+ private LocalTransactionState transactionState;
+ private boolean fromTransactionCheck;
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ public void setMessage(Message message) {
+ this.message = message;
+ }
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+ public void setBrokerAddr(String brokerAddr) {
+ this.brokerAddr = brokerAddr;
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public LocalTransactionState getTransactionState() {
+ return transactionState;
+ }
+
+ public void setTransactionState(LocalTransactionState transactionState) {
+ this.transactionState = transactionState;
+ }
+
+ public boolean isFromTransactionCheck() {
+ return fromTransactionCheck;
+ }
+
+ public void setFromTransactionCheck(boolean fromTransactionCheck) {
+ this.fromTransactionCheck = fromTransactionCheck;
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionHook.java
similarity index 82%
copy from client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
copy to client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionHook.java
index 79b19c1..834cb27 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionHook.java
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.trace;
+package org.apache.rocketmq.client.hook;
-public enum TraceType {
- Pub,
- SubBefore,
- SubAfter,
+public interface EndTransactionHook {
+ String hookName();
+
+ void endTransaction(final EndTransactionContext context);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 2f9146d..fac3ed3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -44,6 +44,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
+import org.apache.rocketmq.client.hook.EndTransactionContext;
+import org.apache.rocketmq.client.hook.EndTransactionHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -101,6 +103,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
+ private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();
private final RPCHook rpcHook;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
@@ -171,6 +174,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
log.info("register sendMessage Hook, {}", hook.hookName());
}
+ public void registerEndTransactionHook(final EndTransactionHook hook) {
+ this.endTransactionHookList.add(hook);
+ log.info("register endTransaction Hook, {}", hook.hookName());
+ }
+
public void start() throws MQClientException {
this.start(true);
}
@@ -386,6 +394,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
+ doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);
try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
@@ -967,6 +976,36 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
+ public boolean hasEndTransactionHook() {
+ return !this.endTransactionHookList.isEmpty();
+ }
+
+ public void executeEndTransactionHook(final EndTransactionContext context) {
+ if (!this.endTransactionHookList.isEmpty()) {
+ for (EndTransactionHook hook : this.endTransactionHookList) {
+ try {
+ hook.endTransaction(context);
+ } catch (Throwable e) {
+ log.warn("failed to executeEndTransactionHook", e);
+ }
+ }
+ }
+ }
+
+ public void doExecuteEndTransactionHook(Message msg, String msgId, String brokerAddr, LocalTransactionState state,
+ boolean fromTransactionCheck) {
+ if (hasEndTransactionHook()) {
+ EndTransactionContext context = new EndTransactionContext();
+ context.setProducerGroup(defaultMQProducer.getProducerGroup());
+ context.setBrokerAddr(brokerAddr);
+ context.setMessage(msg);
+ context.setMsgId(msgId);
+ context.setTransactionId(msg.getTransactionId());
+ context.setTransactionState(state);
+ context.setFromTransactionCheck(fromTransactionCheck);
+ executeEndTransactionHook(context);
+ }
+ }
/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
@@ -1266,7 +1305,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
try {
- this.endTransaction(sendResult, localTransactionState, localException);
+ this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
@@ -1290,6 +1329,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public void endTransaction(
+ final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
@@ -1318,6 +1358,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
break;
}
+ doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 24caf14..1c4a931 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
+import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
@@ -167,6 +168,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
+ this.defaultMQProducerImpl.registerEndTransactionHook(
+ new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
@@ -252,6 +255,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
+ this.defaultMQProducerImpl.registerEndTransactionHook(
+ new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
@@ -916,24 +921,24 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
}
-
+
@Override
public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
}
-
+
@Override
public void send(Collection<Message> msgs, SendCallback sendCallback,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback, timeout);
}
-
+
@Override
public void send(Collection<Message> msgs, MessageQueue mq,
SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback);
}
-
+
@Override
public void send(Collection<Message> msgs, MessageQueue mq,
SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
index 63b512d..4eb758d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -51,6 +51,10 @@ public class TransactionMQProducer extends DefaultMQProducer {
super(namespace, producerGroup, rpcHook);
}
+ public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
+ super(namespace, producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic);
+ }
+
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
index f93aa38..70c147e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
@@ -32,7 +33,9 @@ public class TraceBean {
private int retryTimes;
private int bodyLength;
private MessageType msgType;
-
+ private LocalTransactionState transactionState;
+ private String transactionId;
+ private boolean fromTransactionCheck;
public MessageType getMsgType() {
return msgType;
@@ -141,4 +144,28 @@ public class TraceBean {
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
+
+ public LocalTransactionState getTransactionState() {
+ return transactionState;
+ }
+
+ public void setTransactionState(LocalTransactionState transactionState) {
+ this.transactionState = transactionState;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public boolean isFromTransactionCheck() {
+ return fromTransactionCheck;
+ }
+
+ public void setFromTransactionCheck(boolean fromTransactionCheck) {
+ this.fromTransactionCheck = fromTransactionCheck;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index acf0dea..b2b0645 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.MessageType;
import java.util.ArrayList;
@@ -109,6 +110,27 @@ public class TraceDataEncoder {
subAfterContext.setGroupName(line[8]);
}
resList.add(subAfterContext);
+ } else if (line[0].equals(TraceType.EndTransaction.name())) {
+ TraceContext endTransactionContext = new TraceContext();
+ endTransactionContext.setTraceType(TraceType.EndTransaction);
+ endTransactionContext.setTimeStamp(Long.parseLong(line[1]));
+ endTransactionContext.setRegionId(line[2]);
+ endTransactionContext.setGroupName(line[3]);
+ TraceBean bean = new TraceBean();
+ bean.setTopic(line[4]);
+ bean.setMsgId(line[5]);
+ bean.setTags(line[6]);
+ bean.setKeys(line[7]);
+ bean.setStoreHost(line[8]);
+ bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]);
+ bean.setClientHost(line[10]);
+ bean.setTransactionId(line[11]);
+ bean.setTransactionState(LocalTransactionState.valueOf(line[12]));
+ bean.setFromTransactionCheck(Boolean.parseBoolean(line[13]));
+
+ endTransactionContext.setTraceBeans(new ArrayList<TraceBean>(1));
+ endTransactionContext.getTraceBeans().add(bean);
+ resList.add(endTransactionContext);
}
}
return resList;
@@ -173,9 +195,26 @@ public class TraceDataEncoder {
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
-
+
}
}
+ case EndTransaction: {
+ TraceBean bean = ctx.getTraceBeans().get(0);
+ sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)//
+ .append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR);
+ }
break;
default:
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
index 79b19c1..8870ddc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
@@ -20,4 +20,5 @@ public enum TraceType {
Pub,
SubBefore,
SubAfter,
+ EndTransaction,
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionTraceHookImpl.java
new file mode 100644
index 0000000..cbd755b
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionTraceHookImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.hook;
+
+import org.apache.rocketmq.client.hook.EndTransactionContext;
+import org.apache.rocketmq.client.hook.EndTransactionHook;
+import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.TraceBean;
+import org.apache.rocketmq.client.trace.TraceContext;
+import org.apache.rocketmq.client.trace.TraceDispatcher;
+import org.apache.rocketmq.client.trace.TraceType;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageType;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+
+import java.util.ArrayList;
+
+public class EndTransactionTraceHookImpl implements EndTransactionHook {
+
+ private TraceDispatcher localDispatcher;
+
+ public EndTransactionTraceHookImpl(TraceDispatcher localDispatcher) {
+ this.localDispatcher = localDispatcher;
+ }
+
+ @Override
+ public String hookName() {
+ return "EndTransactionTraceHook";
+ }
+
+ @Override
+ public void endTransaction(EndTransactionContext context) {
+ //if it is message trace data,then it doesn't recorded
+ if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
+ return;
+ }
+ Message msg = context.getMessage();
+ //build the context content of TuxeTraceContext
+ TraceContext tuxeContext = new TraceContext();
+ tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
+ tuxeContext.setTraceType(TraceType.EndTransaction);
+ tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
+ //build the data bean object of message trace
+ TraceBean traceBean = new TraceBean();
+ traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
+ traceBean.setTags(context.getMessage().getTags());
+ traceBean.setKeys(context.getMessage().getKeys());
+ traceBean.setStoreHost(context.getBrokerAddr());
+ traceBean.setMsgType(MessageType.Trans_msg_Commit);
+ traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
+ traceBean.setMsgId(context.getMsgId());
+ traceBean.setTransactionState(context.getTransactionState());
+ traceBean.setTransactionId(context.getTransactionId());
+ traceBean.setFromTransactionCheck(context.isFromTransactionCheck());
+ String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
+ if (regionId == null || regionId.isEmpty()) {
+ regionId = MixAll.DEFAULT_TRACE_REGION_ID;
+ }
+ tuxeContext.setRegionId(regionId);
+ tuxeContext.getTraceBeans().add(traceBean);
+ tuxeContext.setTimeStamp(System.currentTimeMillis());
+ localDispatcher.append(tuxeContext);
+ }
+
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
index 249a0d1..bac12ea 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.trace;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert;
@@ -90,4 +91,46 @@ public class TraceDataEncoderTest {
Assert.assertEquals(traceTransferBean.getTransKey().size(), 2);
}
-}
\ No newline at end of file
+ @Test
+ public void testEncoderFromContextBean_EndTransaction() {
+ TraceContext context = new TraceContext();
+ context.setTraceType(TraceType.EndTransaction);
+ context.setGroupName("PID-test");
+ context.setRegionId("DefaultRegion");
+ context.setTimeStamp(time);
+ TraceBean traceBean = new TraceBean();
+ traceBean.setTopic("topic-test");
+ traceBean.setKeys("Keys");
+ traceBean.setTags("Tags");
+ traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
+ traceBean.setStoreHost("127.0.0.1:10911");
+ traceBean.setClientHost("127.0.0.1@41700");
+ traceBean.setMsgType(MessageType.Trans_msg_Commit);
+ traceBean.setTransactionId("transactionId");
+ traceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE);
+ traceBean.setFromTransactionCheck(false);
+ List<TraceBean> traceBeans = new ArrayList<TraceBean>();
+ traceBeans.add(traceBean);
+ context.setTraceBeans(traceBeans);
+ TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(context);
+
+ Assert.assertEquals(traceTransferBean.getTransKey().size(), 2);
+ String traceData = traceTransferBean.getTransData();
+ TraceContext contextAfter = TraceDataEncoder.decoderFromTraceDataString(traceData).get(0);
+ Assert.assertEquals(context.getTraceType(), contextAfter.getTraceType());
+ Assert.assertEquals(context.getTimeStamp(), contextAfter.getTimeStamp());
+ Assert.assertEquals(context.getGroupName(), contextAfter.getGroupName());
+ TraceBean before = context.getTraceBeans().get(0);
+ TraceBean after = contextAfter.getTraceBeans().get(0);
+ Assert.assertEquals(before.getTopic(), after.getTopic());
+ Assert.assertEquals(before.getMsgId(), after.getMsgId());
+ Assert.assertEquals(before.getTags(), after.getTags());
+ Assert.assertEquals(before.getKeys(), after.getKeys());
+ Assert.assertEquals(before.getStoreHost(), after.getStoreHost());
+ Assert.assertEquals(before.getMsgType(), after.getMsgType());
+ Assert.assertEquals(before.getClientHost(), after.getClientHost());
+ Assert.assertEquals(before.getTransactionId(), after.getTransactionId());
+ Assert.assertEquals(before.getTransactionState(), after.getTransactionState());
+ Assert.assertEquals(before.isFromTransactionCheck(), after.isFromTransactionCheck());
+ }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
new file mode 100644
index 0000000..f838817
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.EndTransactionContext;
+import org.apache.rocketmq.client.hook.EndTransactionHook;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TransactionMQProducerWithTraceTest {
+
+ @Spy
+ private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+ @Mock
+ private MQClientAPIImpl mQClientAPIImpl;
+ @Mock
+ private EndTransactionHook endTransactionHook;
+
+ private AsyncTraceDispatcher asyncTraceDispatcher;
+
+ private TransactionMQProducer producer;
+ private DefaultMQProducer traceProducer;
+
+ private Message message;
+ private String topic = "FooBar";
+ private String producerGroupPrefix = "FooBar_PID";
+ private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
+ private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
+ private String customerTraceTopic = "rmq_trace_topic_12345";
+
+ @Before
+ public void init() throws Exception {
+ TransactionListener transactionListener = new TransactionListener() {
+ @Override
+ public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+
+ @Override
+ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+ };
+ producer = new TransactionMQProducer(null, producerGroupTemp, null, true, null);
+ producer.setTransactionListener(transactionListener);
+
+ producer.setNamesrvAddr("127.0.0.1:9876");
+ message = new Message(topic, new byte[] {'a', 'b', 'c'});
+ asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
+ traceProducer = asyncTraceDispatcher.getTraceProducer();
+
+ producer.start();
+
+ Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
+
+ Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+ fieldTrace.setAccessible(true);
+ fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory);
+
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mQClientFactory, mQClientAPIImpl);
+
+ producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
+
+ Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList");
+ fieldHooks.setAccessible(true);
+ List<EndTransactionHook>hooks = new ArrayList<>();
+ hooks.add(endTransactionHook);
+ fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks);
+
+ when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+ nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
+ when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+ nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+ .thenReturn(createSendResult(SendStatus.SEND_OK));
+
+ }
+
+ @Test
+ public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+ traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ AtomicReference<EndTransactionContext> context = new AtomicReference<>();
+ doAnswer(mock -> {
+ context.set(mock.getArgument(0));
+ return null;
+ }).when(endTransactionHook).endTransaction(any());
+ producer.sendMessageInTransaction(message, null);
+
+ EndTransactionContext ctx = context.get();
+ assertThat(ctx.getProducerGroup()).isEqualTo(producerGroupTemp);
+ assertThat(ctx.getMsgId()).isEqualTo("123");
+ assertThat(ctx.isFromTransactionCheck()).isFalse();
+ assertThat(new String(ctx.getMessage().getBody())).isEqualTo(new String(message.getBody()));
+ assertThat(ctx.getMessage().getTopic()).isEqualTo(topic);
+ }
+
+ @After
+ public void terminate() {
+ producer.shutdown();
+ }
+
+ public static TopicRouteData createTopicRoute() {
+ TopicRouteData topicRouteData = new TopicRouteData();
+
+ topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+ List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
+ BrokerData brokerData = new BrokerData();
+ brokerData.setBrokerName("BrokerA");
+ brokerData.setCluster("DefaultCluster");
+ HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+ brokerAddrs.put(0L, "127.0.0.1:10911");
+ brokerData.setBrokerAddrs(brokerAddrs);
+ brokerDataList.add(brokerData);
+ topicRouteData.setBrokerDatas(brokerDataList);
+
+ List<QueueData> queueDataList = new ArrayList<QueueData>();
+ QueueData queueData = new QueueData();
+ queueData.setBrokerName("BrokerA");
+ queueData.setPerm(6);
+ queueData.setReadQueueNums(3);
+ queueData.setWriteQueueNums(4);
+ queueData.setTopicSysFlag(0);
+ queueDataList.add(queueData);
+ topicRouteData.setQueueDatas(queueDataList);
+ return topicRouteData;
+ }
+
+ private SendResult createSendResult(SendStatus sendStatus) {
+ SendResult sendResult = new SendResult();
+ sendResult.setMsgId("123");
+ sendResult.setOffsetMsgId(MessageDecoder.createMessageId(new InetSocketAddress("127.0.0.1", 12), 1));
+ sendResult.setQueueOffset(456);
+ sendResult.setSendStatus(sendStatus);
+ sendResult.setRegionId("HZ");
+ sendResult.setMessageQueue(new MessageQueue(topic, "broker-trace", 0));
+ return sendResult;
+ }
+
+}