You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2021/05/10 08:21:47 UTC

[rocketmq] branch develop updated: Support OpenTracing(#2861)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f2a5a74  Support OpenTracing(#2861)
f2a5a74 is described below

commit f2a5a747fbf248776340f4a99118d421de2a5310
Author: yuz10 <84...@qq.com>
AuthorDate: Mon May 10 16:21:34 2021 +0800

    Support OpenTracing(#2861)
---
 client/pom.xml                                     |  12 ++
 .../rocketmq/client/trace/TraceConstants.java      |  16 ++
 .../hook/ConsumeMessageOpenTracingHookImpl.java    |  95 +++++++++
 .../hook/EndTransactionOpenTracingHookImpl.java    |  72 +++++++
 .../trace/hook/SendMessageOpenTracingHookImpl.java |  88 ++++++++
 .../DefaultMQConsumerWithOpenTracingTest.java      | 230 +++++++++++++++++++++
 .../DefaultMQProducerWithOpenTracingTest.java      | 170 +++++++++++++++
 .../TransactionMQProducerWithOpenTracingTest.java  | 189 +++++++++++++++++
 example/pom.xml                                    |  10 +
 .../example/tracemessage/OpenTracingProducer.java  |  68 ++++++
 .../tracemessage/OpenTracingPushConsumer.java      |  71 +++++++
 .../OpenTracingTransactionProducer.java            |  86 ++++++++
 12 files changed, 1107 insertions(+)

diff --git a/client/pom.xml b/client/pom.xml
index 0a2fe2d..164082c 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -48,6 +48,18 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.opentracing</groupId>
+            <artifactId>opentracing-api</artifactId>
+            <version>0.33.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.opentracing</groupId>
+            <artifactId>opentracing-mock</artifactId>
+            <version>0.33.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-core</artifactId>
             <scope>test</scope>
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
index 27622cd..1ad4b61 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
@@ -25,4 +25,20 @@ public class TraceConstants {
     public static final char FIELD_SPLITOR = (char) 2;
     public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
     public static final String TRACE_TOPIC_PREFIX = TopicValidator.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
+    public static final String TO_PREFIX = "To_";
+    public static final String FROM_PREFIX = "From_";
+    public static final String END_TRANSACTION = "EndTransaction";
+    public static final String ROCKETMQ_SERVICE = "rocketmq";
+    public static final String ROCKETMQ_SUCCESS = "rocketmq.success";
+    public static final String ROCKETMQ_TAGS = "rocketmq.tags";
+    public static final String ROCKETMQ_KEYS = "rocketmq.keys";
+    public static final String ROCKETMQ_SOTRE_HOST = "rocketmq.store_host";
+    public static final String ROCKETMQ_BODY_LENGTH = "rocketmq.body_length";
+    public static final String ROCKETMQ_MSG_ID = "rocketmq.mgs_id";
+    public static final String ROCKETMQ_MSG_TYPE = "rocketmq.mgs_type";
+    public static final String ROCKETMQ_REGION_ID = "rocketmq.region_id";
+    public static final String ROCKETMQ_TRANSACTION_ID = "rocketmq.transaction_id";
+    public static final String ROCKETMQ_TRANSACTION_STATE = "rocketmq.transaction_state";
+    public static final String ROCKETMQ_IS_FROM_TRANSACTION_CHECK = "rocketmq.is_from_transaction_check";
+    public static final String ROCKETMQ_RETRY_TIMERS = "rocketmq.retry_times";
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
new file mode 100644
index 0000000..28fccae
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.hook;
+
+import io.opentracing.Span;
+import io.opentracing.SpanContext;
+import io.opentracing.Tracer;
+import io.opentracing.propagation.Format;
+import io.opentracing.propagation.TextMapAdapter;
+import io.opentracing.tag.Tags;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.trace.TraceConstants;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class ConsumeMessageOpenTracingHookImpl implements ConsumeMessageHook {
+
+    private Tracer tracer;
+
+    public ConsumeMessageOpenTracingHookImpl(Tracer tracer) {
+        this.tracer = tracer;
+    }
+
+    @Override
+    public String hookName() {
+        return "ConsumeMessageOpenTracingHook";
+    }
+
+    @Override
+    public void consumeMessageBefore(ConsumeMessageContext context) {
+        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
+            return;
+        }
+        List<Span> spanList = new ArrayList<>();
+        for (MessageExt msg : context.getMsgList()) {
+            if (msg == null) {
+                continue;
+            }
+            Tracer.SpanBuilder spanBuilder = tracer
+                    .buildSpan(TraceConstants.FROM_PREFIX + msg.getTopic())
+                    .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER);
+            SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
+            if (spanContext != null) {
+                spanBuilder.asChildOf(spanContext);
+            }
+            Span span = spanBuilder.start();
+
+            span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE);
+            span.setTag(Tags.MESSAGE_BUS_DESTINATION, NamespaceUtil.withoutNamespace(msg.getTopic()));
+            span.setTag(TraceConstants.ROCKETMQ_MSG_ID, msg.getMsgId());
+            span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags());
+            span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
+            span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getStoreSize());
+            span.setTag(TraceConstants.ROCKETMQ_RETRY_TIMERS, msg.getReconsumeTimes());
+            span.setTag(TraceConstants.ROCKETMQ_REGION_ID, msg.getProperty(MessageConst.PROPERTY_MSG_REGION));
+            spanList.add(span);
+        }
+        context.setMqTraceContext(spanList);
+    }
+
+    @Override
+    public void consumeMessageAfter(ConsumeMessageContext context) {
+        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
+            return;
+        }
+        List<Span> spanList = (List<Span>) context.getMqTraceContext();
+        if (spanList == null) {
+            return;
+        }
+        for (Span span : spanList) {
+            span.setTag(TraceConstants.ROCKETMQ_SUCCESS, context.isSuccess());
+            span.finish();
+        }
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionOpenTracingHookImpl.java
new file mode 100644
index 0000000..62d310f
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionOpenTracingHookImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.hook;
+
+import io.opentracing.Span;
+import io.opentracing.SpanContext;
+import io.opentracing.Tracer;
+import io.opentracing.propagation.Format;
+import io.opentracing.propagation.TextMapAdapter;
+import io.opentracing.tag.Tags;
+import org.apache.rocketmq.client.hook.EndTransactionContext;
+import org.apache.rocketmq.client.hook.EndTransactionHook;
+import org.apache.rocketmq.client.trace.TraceConstants;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageType;
+
+public class EndTransactionOpenTracingHookImpl implements EndTransactionHook {
+
+    private Tracer tracer;
+
+    public EndTransactionOpenTracingHookImpl(Tracer tracer) {
+        this.tracer = tracer;
+    }
+
+    @Override
+    public String hookName() {
+        return "EndTransactionOpenTracingHook";
+    }
+
+    @Override
+    public void endTransaction(EndTransactionContext context) {
+        if (context == null) {
+            return;
+        }
+        Message msg = context.getMessage();
+        Tracer.SpanBuilder spanBuilder = tracer
+                .buildSpan(TraceConstants.END_TRANSACTION)
+                .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER);
+        SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
+        if (spanContext != null) {
+            spanBuilder.asChildOf(spanContext);
+        }
+
+        Span span = spanBuilder.start();
+        span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE);
+        span.setTag(Tags.MESSAGE_BUS_DESTINATION, msg.getTopic());
+        span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags());
+        span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
+        span.setTag(TraceConstants.ROCKETMQ_SOTRE_HOST, context.getBrokerAddr());
+        span.setTag(TraceConstants.ROCKETMQ_MSG_ID, context.getMsgId());
+        span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE, MessageType.Trans_msg_Commit.name());
+        span.setTag(TraceConstants.ROCKETMQ_TRANSACTION_ID, context.getTransactionId());
+        span.setTag(TraceConstants.ROCKETMQ_TRANSACTION_STATE, context.getTransactionState().name());
+        span.setTag(TraceConstants.ROCKETMQ_IS_FROM_TRANSACTION_CHECK, context.isFromTransactionCheck());
+        span.finish();
+    }
+
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
new file mode 100644
index 0000000..60c18a2
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.hook;
+
+import io.opentracing.Span;
+import io.opentracing.SpanContext;
+import io.opentracing.Tracer;
+import io.opentracing.propagation.Format;
+import io.opentracing.propagation.TextMapAdapter;
+import io.opentracing.tag.Tags;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.trace.TraceConstants;
+import org.apache.rocketmq.common.message.Message;
+
+public class SendMessageOpenTracingHookImpl implements SendMessageHook {
+
+    private Tracer tracer;
+
+    public SendMessageOpenTracingHookImpl(Tracer tracer) {
+        this.tracer = tracer;
+    }
+
+    @Override
+    public String hookName() {
+        return "SendMessageOpenTracingHook";
+    }
+
+    @Override
+    public void sendMessageBefore(SendMessageContext context) {
+        if (context == null) {
+            return;
+        }
+        Message msg = context.getMessage();
+        Tracer.SpanBuilder spanBuilder = tracer
+                .buildSpan(TraceConstants.TO_PREFIX + msg.getTopic())
+                .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER);
+        SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
+        if (spanContext != null) {
+            spanBuilder.asChildOf(spanContext);
+        }
+        Span span = spanBuilder.start();
+        tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
+        span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE);
+        span.setTag(Tags.MESSAGE_BUS_DESTINATION, msg.getTopic());
+        span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags());
+        span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
+        span.setTag(TraceConstants.ROCKETMQ_SOTRE_HOST, context.getBrokerAddr());
+        span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE, context.getMsgType().name());
+        span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getBody().length);
+        context.setMqTraceContext(span);
+    }
+
+    @Override
+    public void sendMessageAfter(SendMessageContext context) {
+        if (context == null || context.getMqTraceContext() == null) {
+            return;
+        }
+        if (context.getSendResult() == null) {
+            return;
+        }
+
+        if (context.getSendResult().getRegionId() == null) {
+            return;
+        }
+
+        Span span = (Span) context.getMqTraceContext();
+        span.setTag(TraceConstants.ROCKETMQ_SUCCESS, context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK));
+        span.setTag(TraceConstants.ROCKETMQ_MSG_ID, context.getSendResult().getMsgId());
+        span.setTag(TraceConstants.ROCKETMQ_REGION_ID, context.getSendResult().getRegionId());
+        span.finish();
+    }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
new file mode 100644
index 0000000..16a3d02
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+import io.opentracing.mock.MockSpan;
+import io.opentracing.mock.MockTracer;
+import io.opentracing.tag.Tags;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
+import org.apache.rocketmq.client.impl.consumer.PullMessageService;
+import org.apache.rocketmq.client.impl.consumer.PullRequest;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.trace.hook.ConsumeMessageOpenTracingHookImpl;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DefaultMQPushConsumerImpl.class)
+@PowerMockIgnore("javax.management.*")
+public class DefaultMQConsumerWithOpenTracingTest {
+    private String consumerGroup;
+
+    private String topic = "FooBar";
+    private String brokerName = "BrokerA";
+    private MQClientInstance mQClientFactory;
+
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+    private PullAPIWrapper pullAPIWrapper;
+    private RebalancePushImpl rebalancePushImpl;
+    private DefaultMQPushConsumer pushConsumer;
+    private MockTracer tracer = new MockTracer();
+
+    @Before
+    public void init() throws Exception {
+        consumerGroup = "FooBarGroup" + System.currentTimeMillis();
+        pushConsumer = new DefaultMQPushConsumer(consumerGroup);
+        pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
+                new ConsumeMessageOpenTracingHookImpl(tracer));
+        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
+        pushConsumer.setPullInterval(60 * 1000);
+
+        pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                            ConsumeConcurrentlyContext context) {
+                return null;
+            }
+        });
+
+        PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
+        DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
+        rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
+        Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, rebalancePushImpl);
+        pushConsumer.subscribe(topic, "*");
+
+        pushConsumer.start();
+
+        mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
+
+        field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
+        field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, pullAPIWrapper);
+
+        pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
+        mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
+
+        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+                anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+                .thenAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(InvocationOnMock mock) throws Throwable {
+                        PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                        MessageClientExt messageClientExt = new MessageClientExt();
+                        messageClientExt.setTopic(topic);
+                        messageClientExt.setQueueId(0);
+                        messageClientExt.setMsgId("123");
+                        messageClientExt.setBody(new byte[]{'a'});
+                        messageClientExt.setOffsetMsgId("234");
+                        messageClientExt.setBornHost(new InetSocketAddress(8080));
+                        messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                        PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+                        ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+                        return pullResult;
+                    }
+                });
+
+        doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
+        Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+        messageQueueSet.add(createPullRequest().getMessageQueue());
+        pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
+    }
+
+    @After
+    public void terminate() {
+        pushConsumer.shutdown();
+    }
+
+    @Test
+    public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        final MessageExt[] messageExts = new MessageExt[1];
+        pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                            ConsumeConcurrentlyContext context) {
+                messageExts[0] = msgs.get(0);
+                countDownLatch.countDown();
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        }));
+
+        PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
+        pullMessageService.executePullRequestImmediately(createPullRequest());
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        assertThat(messageExts[0].getTopic()).isEqualTo(topic);
+        assertThat(messageExts[0].getBody()).isEqualTo(new byte[]{'a'});
+
+        assertThat(tracer.finishedSpans().size()).isEqualTo(1);
+        MockSpan span = tracer.finishedSpans().get(0);
+        assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
+        assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER);
+        assertThat(span.tags().get(TraceConstants.ROCKETMQ_SUCCESS)).isEqualTo(true);
+    }
+
+    private PullRequest createPullRequest() {
+        PullRequest pullRequest = new PullRequest();
+        pullRequest.setConsumerGroup(consumerGroup);
+        pullRequest.setNextOffset(1024);
+
+        MessageQueue messageQueue = new MessageQueue();
+        messageQueue.setBrokerName(brokerName);
+        messageQueue.setQueueId(0);
+        messageQueue.setTopic(topic);
+        pullRequest.setMessageQueue(messageQueue);
+        ProcessQueue processQueue = new ProcessQueue();
+        processQueue.setLocked(true);
+        processQueue.setLastLockTimestamp(System.currentTimeMillis());
+        pullRequest.setProcessQueue(processQueue);
+
+        return pullRequest;
+    }
+
+    private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
+                                           List<MessageExt> messageExtList) throws Exception {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        for (MessageExt messageExt : messageExtList) {
+            outputStream.write(MessageDecoder.encode(messageExt, false));
+        }
+        return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
+    }
+
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
new file mode 100644
index 0000000..5d64a93
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+import io.opentracing.mock.MockSpan;
+import io.opentracing.mock.MockTracer;
+import io.opentracing.tag.Tags;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageType;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQProducerWithOpenTracingTest {
+
+    @Spy
+    private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+
+    private DefaultMQProducer producer;
+
+    private Message message;
+    private String topic = "FooBar";
+    private String producerGroupPrefix = "FooBar_PID";
+    private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
+    private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
+    private MockTracer tracer = new MockTracer();
+
+    @Before
+    public void init() throws Exception {
+
+        producer = new DefaultMQProducer(producerGroupTemp);
+        producer.getDefaultMQProducerImpl().registerSendMessageHook(
+                new SendMessageOpenTracingHookImpl(tracer));
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        message = new Message(topic, new byte[] {'a', 'b', 'c'});
+
+        producer.start();
+
+        Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
+
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenReturn(createSendResult(SendStatus.SEND_OK));
+
+    }
+
+    @Test
+    public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        producer.send(message);
+        assertThat(tracer.finishedSpans().size()).isEqualTo(1);
+        MockSpan span = tracer.finishedSpans().get(0);
+        assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
+        assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_PRODUCER);
+        assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_ID)).isEqualTo("123");
+        assertThat(span.tags().get(TraceConstants.ROCKETMQ_BODY_LENGTH)).isEqualTo(3);
+        assertThat(span.tags().get(TraceConstants.ROCKETMQ_REGION_ID)).isEqualTo("HZ");
+        assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_TYPE)).isEqualTo(MessageType.Normal_Msg.name());
+        assertThat(span.tags().get(TraceConstants.ROCKETMQ_SOTRE_HOST)).isEqualTo("127.0.0.1:10911");
+    }
+
+    @After
+    public void terminate() {
+        producer.shutdown();
+    }
+
+    public static TopicRouteData createTopicRoute() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("BrokerA");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+        brokerAddrs.put(0L, "127.0.0.1:10911");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("BrokerA");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSysFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+        return topicRouteData;
+    }
+
+    private SendResult createSendResult(SendStatus sendStatus) {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("123");
+        sendResult.setOffsetMsgId("123");
+        sendResult.setQueueOffset(456);
+        sendResult.setSendStatus(sendStatus);
+        sendResult.setRegionId("HZ");
+        return sendResult;
+    }
+
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
new file mode 100644
index 0000000..dd6d108
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+import io.opentracing.mock.MockSpan;
+import io.opentracing.mock.MockTracer;
+import io.opentracing.tag.Tags;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.trace.hook.EndTransactionOpenTracingHookImpl;
+import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageType;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TransactionMQProducerWithOpenTracingTest {
+
+    @Spy
+    private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+
+    private TransactionMQProducer producer;
+
+    private Message message;
+    private String topic = "FooBar";
+    private String producerGroupPrefix = "FooBar_PID";
+    private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
+    private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
+    private MockTracer tracer = new MockTracer();
+    @Before
+    public void init() throws Exception {
+        TransactionListener transactionListener = new TransactionListener() {
+            @Override
+            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
+                return LocalTransactionState.COMMIT_MESSAGE;
+            }
+
+            @Override
+            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+                return LocalTransactionState.COMMIT_MESSAGE;
+            }
+        };
+        producer = new TransactionMQProducer(producerGroupTemp);
+        producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
+        producer.getDefaultMQProducerImpl().registerEndTransactionHook(new EndTransactionOpenTracingHookImpl(tracer));
+        producer.setTransactionListener(transactionListener);
+
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        message = new Message(topic, new byte[] {'a', 'b', 'c'});
+
+        producer.start();
+
+        Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
+
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenReturn(createSendResult(SendStatus.SEND_OK));
+
+    }
+
+    @Test
+    public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        producer.sendMessageInTransaction(message, null);
+
+        assertThat(tracer.finishedSpans().size()).isEqualTo(2);
+        MockSpan span = tracer.finishedSpans().get(1);
+        assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
+        assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_PRODUCER);
+        assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_ID)).isEqualTo("123");
+        assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_TYPE)).isEqualTo(MessageType.Trans_msg_Commit.name());
+        assertThat(span.tags().get(TraceConstants.ROCKETMQ_TRANSACTION_STATE)).isEqualTo(LocalTransactionState.COMMIT_MESSAGE.name());
+        assertThat(span.tags().get(TraceConstants.ROCKETMQ_IS_FROM_TRANSACTION_CHECK)).isEqualTo(false);
+    }
+
+    @After
+    public void terminate() {
+        producer.shutdown();
+    }
+
+    public static TopicRouteData createTopicRoute() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("BrokerA");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+        brokerAddrs.put(0L, "127.0.0.1:10911");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("BrokerA");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSysFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+        return topicRouteData;
+    }
+
+    private SendResult createSendResult(SendStatus sendStatus) {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("123");
+        sendResult.setOffsetMsgId(MessageDecoder.createMessageId(new InetSocketAddress("127.0.0.1", 12), 1));
+        sendResult.setQueueOffset(456);
+        sendResult.setSendStatus(sendStatus);
+        sendResult.setRegionId("HZ");
+        sendResult.setMessageQueue(new MessageQueue(topic, "broker-trace", 0));
+        return sendResult;
+    }
+
+}
diff --git a/example/pom.xml b/example/pom.xml
index abe0b95..568dd80 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -52,5 +52,15 @@
             <groupId>org.javassist</groupId>
             <artifactId>javassist</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.jaegertracing</groupId>
+            <artifactId>jaeger-core</artifactId>
+            <version>1.6.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.jaegertracing</groupId>
+            <artifactId>jaeger-client</artifactId>
+            <version>1.6.0</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
new file mode 100644
index 0000000..cd9ae27
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.tracemessage;
+
+import io.jaegertracing.Configuration;
+import io.jaegertracing.internal.samplers.ConstSampler;
+import io.opentracing.Tracer;
+import io.opentracing.util.GlobalTracer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class OpenTracingProducer {
+    public static void main(String[] args) throws MQClientException {
+
+        Tracer tracer = initTracer();
+
+        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+        producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
+        producer.start();
+
+        try {
+            Message msg = new Message("TopicTest",
+                    "TagA",
+                    "OrderID188",
+                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+            SendResult sendResult = producer.send(msg);
+            System.out.printf("%s%n", sendResult);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        producer.shutdown();
+    }
+
+    private static Tracer initTracer() {
+        Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
+                .withType(ConstSampler.TYPE)
+                .withParam(1);
+        Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
+                .withLogSpans(true);
+
+        Configuration config = new Configuration("rocketmq")
+                .withSampler(samplerConfig)
+                .withReporter(reporterConfig);
+        GlobalTracer.registerIfAbsent(config.getTracer());
+        return config.getTracer();
+    }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
new file mode 100644
index 0000000..1d5d8a2
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.tracemessage;
+
+import io.jaegertracing.Configuration;
+import io.jaegertracing.internal.samplers.ConstSampler;
+import io.opentracing.Tracer;
+import io.opentracing.util.GlobalTracer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.trace.hook.ConsumeMessageOpenTracingHookImpl;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class OpenTracingPushConsumer {
+    public static void main(String[] args) throws InterruptedException, MQClientException {
+        Tracer tracer = initTracer();
+
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
+        consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
+
+        consumer.subscribe("TopicTest", "*");
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+        consumer.setConsumeTimestamp("20181109221800");
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();
+        System.out.printf("Consumer Started.%n");
+    }
+
+    private static Tracer initTracer() {
+        Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
+                .withType(ConstSampler.TYPE)
+                .withParam(1);
+        Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
+                .withLogSpans(true);
+
+        Configuration config = new Configuration("rocketmq")
+                .withSampler(samplerConfig)
+                .withReporter(reporterConfig);
+        GlobalTracer.registerIfAbsent(config.getTracer());
+        return config.getTracer();
+    }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
new file mode 100644
index 0000000..428640a
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.tracemessage;
+
+import io.jaegertracing.Configuration;
+import io.jaegertracing.internal.samplers.ConstSampler;
+import io.opentracing.Tracer;
+import io.opentracing.util.GlobalTracer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.trace.hook.EndTransactionOpenTracingHookImpl;
+import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.UnsupportedEncodingException;
+
+public class OpenTracingTransactionProducer {
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+        Tracer tracer = initTracer();
+
+        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
+        producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
+        producer.getDefaultMQProducerImpl().registerEndTransactionHook(new EndTransactionOpenTracingHookImpl(tracer));
+
+        producer.setTransactionListener(new TransactionListener() {
+            @Override
+            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
+                return LocalTransactionState.COMMIT_MESSAGE;
+            }
+
+            @Override
+            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+                return LocalTransactionState.COMMIT_MESSAGE;
+            }
+        });
+        producer.start();
+
+        try {
+            Message msg = new Message("TopicTest", "Tag", "KEY",
+                    ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
+            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
+            System.out.printf("%s%n", sendResult);
+        } catch (MQClientException | UnsupportedEncodingException e) {
+            e.printStackTrace();
+        }
+
+        for (int i = 0; i < 100000; i++) {
+            Thread.sleep(1000);
+        }
+        producer.shutdown();
+    }
+
+    private static Tracer initTracer() {
+        Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
+                .withType(ConstSampler.TYPE)
+                .withParam(1);
+        Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
+                .withLogSpans(true);
+
+        Configuration config = new Configuration("rocketmq")
+                .withSampler(samplerConfig)
+                .withReporter(reporterConfig);
+        GlobalTracer.registerIfAbsent(config.getTracer());
+        return config.getTracer();
+    }
+}