You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/01/10 02:06:51 UTC

[GitHub] dongeforever closed pull request #537: [ISSUE #525] Support the message track

dongeforever closed pull request #537: [ISSUE #525] Support the message track
URL: https://github.com/apache/rocketmq/pull/537
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 73fe43942..9771ec9a9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -228,6 +228,14 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
             return response;
         }
 
+        if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
+            String errorMsg = "the topic[" + requestHeader.getTopic() + "] is user self defined topic and this node is trace broker!";
+            log.warn(errorMsg);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(errorMsg);
+            return response;
+        }
+
         try {
             response.setCode(ResponseCode.SUCCESS);
             response.setOpaque(request.getOpaque());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 59c7895eb..163897b1c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -124,6 +124,16 @@ public TopicConfigManager(BrokerController brokerController) {
             topicConfig.setWriteQueueNums(1);
             this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
+        {
+            if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
+                String topic = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
+                TopicConfig topicConfig = new TopicConfig(topic);
+                this.systemTopicList.add(topic);
+                topicConfig.setReadQueueNums(1);
+                topicConfig.setWriteQueueNums(1);
+                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+            }
+        }
     }
 
     public boolean isSystemTopic(final String topic) {
@@ -154,6 +164,10 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
                     if (topicConfig != null)
                         return topicConfig;
 
+                    if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
+                        return topicConfig;
+                    }
+
                     TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
                     if (defaultTopicConfig != null) {
                         if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index d51030a15..0b5ce0521 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -18,6 +18,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
@@ -29,6 +30,12 @@
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType;
+import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
+import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.client.trace.core.hook.ConsumeMessageTraceHookImpl;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -36,6 +43,7 @@
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
@@ -56,6 +64,8 @@
  */
 public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
 
+    private final InternalLogger log = ClientLogger.getLog();
+
     /**
      * Internal implementation. Most of the functions herein are delegated to it.
      */
@@ -246,6 +256,11 @@
      */
     private long consumeTimeout = 15;
 
+    /**
+     * Interface of asynchronous transfer data
+     */
+    private AsyncDispatcher traceDispatcher = null;
+
     /**
      * Default constructor.
      */
@@ -267,6 +282,39 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
     }
 
+    /**
+     * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
+     *
+     * @param consumerGroup Consume queue.
+     * @param rpcHook RPC hook to execute before each remoting command.
+     * @param allocateMessageQueueStrategy message queue allocating algorithm.
+     * @param msgTraceSwitch switch flag instance for message track trace.
+     */
+    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
+        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch) {
+        this.consumerGroup = consumerGroup;
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
+        if (msgTraceSwitch) {
+            try {
+                Properties tempProperties = new Properties();
+                tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000");
+                tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048");
+                tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
+                tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
+                tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.CONSUMER.name());
+                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
+                dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
+                traceDispatcher = dispatcher;
+
+                this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
+                    new ConsumeMessageTraceHookImpl(traceDispatcher));
+            } catch (Throwable e) {
+                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+            }
+        }
+    }
+
     /**
      * Constructor specifying RPC hook.
      *
@@ -276,6 +324,16 @@ public DefaultMQPushConsumer(RPCHook rpcHook) {
         this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
     }
 
+    /**
+     * Constructor specifying consumer group.
+     *
+     * @param consumerGroup Consumer group.
+     * @param msgTraceSwitch switch flag instance for message track trace.
+     */
+    public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch) {
+        this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch);
+    }
+
     /**
      * Constructor specifying consumer group.
      *
@@ -518,6 +576,15 @@ public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
     @Override
     public void start() throws MQClientException {
         this.defaultMQPushConsumerImpl.start();
+        if (null != traceDispatcher) {
+            try {
+                Properties tempProperties = new Properties();
+                tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
+                traceDispatcher.start(tempProperties);
+            } catch (MQClientException e) {
+                log.warn("trace dispatcher start failed ", e);
+            }
+        }
     }
 
     /**
@@ -526,6 +593,9 @@ public void start() throws MQClientException {
     @Override
     public void shutdown() {
         this.defaultMQPushConsumerImpl.shutdown();
+        if (null != traceDispatcher) {
+            traceDispatcher.shutdown();
+        }
     }
 
     @Override
@@ -694,4 +764,12 @@ public long getConsumeTimeout() {
     public void setConsumeTimeout(final long consumeTimeout) {
         this.consumeTimeout = consumeTimeout;
     }
+
+    public AsyncDispatcher getTraceDispatcher() {
+        return traceDispatcher;
+    }
+
+    public void setTraceDispatcher(AsyncDispatcher traceDispatcher) {
+        this.traceDispatcher = traceDispatcher;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 9732d0eb8..a654d7631 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -18,6 +18,7 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
@@ -25,6 +26,12 @@
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType;
+import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
+import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
@@ -33,6 +40,7 @@
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
@@ -56,6 +64,8 @@
  */
 public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
+    private final InternalLogger log = ClientLogger.getLog();
+
     /**
      * Wrapping internal implementations for virtually all methods presented in this class.
      */
@@ -119,11 +129,16 @@
      */
     private int maxMessageSize = 1024 * 1024 * 4; // 4M
 
+    /**
+     * Interface of asynchronous transfer data
+     */
+    private AsyncDispatcher traceDispatcher = null;
+
     /**
      * Default constructor.
      */
     public DefaultMQProducer() {
-        this(MixAll.DEFAULT_PRODUCER_GROUP, null);
+        this(MixAll.DEFAULT_PRODUCER_GROUP, null,false);
     }
 
     /**
@@ -137,6 +152,37 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
     }
 
+    /**
+     * Constructor specifying both producer group and RPC hook.
+     *
+     * @param producerGroup Producer group, see the name-sake field.
+     * @param rpcHook RPC hook to execute per each remoting command execution.
+     * @param msgTraceSwitch switch flag instance for message track trace.
+     */
+    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch) {
+        this.producerGroup = producerGroup;
+        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+        //if client open the message track trace feature
+        if (msgTraceSwitch) {
+            try {
+                Properties tempProperties = new Properties();
+                tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000");
+                tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048");
+                tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
+                tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
+                tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.PRODUCER.name());
+                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
+                dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
+                traceDispatcher = dispatcher;
+
+                this.getDefaultMQProducerImpl().registerSendMessageHook(
+                    new SendMessageTrackHookImpl(traceDispatcher));
+            } catch (Throwable e) {
+                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+            }
+        }
+    }
+
     /**
      * Constructor specifying producer group.
      *
@@ -147,8 +193,18 @@ public DefaultMQProducer(final String producerGroup) {
     }
 
     /**
-     * Constructor specifying the RPC hook.
+     * Constructor specifying producer group.
      *
+     * @param producerGroup Producer group, see the name-sake field.
+     * @param msgTraceSwitch switch flag instance for message track trace.
+     */
+    public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch) {
+        this(producerGroup, null, msgTraceSwitch);
+    }
+
+    /**
+     * Constructor specifying the RPC hook.
+     * 
      * @param rpcHook RPC hook to execute per each remoting command execution.
      */
     public DefaultMQProducer(RPCHook rpcHook) {
@@ -170,6 +226,15 @@ public DefaultMQProducer(RPCHook rpcHook) {
     @Override
     public void start() throws MQClientException {
         this.defaultMQProducerImpl.start();
+        if (null != traceDispatcher) {
+            try {
+                Properties tempProperties = new Properties();
+                tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
+                traceDispatcher.start(tempProperties);
+            } catch (MQClientException e) {
+                log.warn("trace dispatcher start failed ", e);
+            }
+        }
     }
 
     /**
@@ -178,6 +243,9 @@ public void start() throws MQClientException {
     @Override
     public void shutdown() {
         this.defaultMQProducerImpl.shutdown();
+        if (null != traceDispatcher) {
+            traceDispatcher.shutdown();
+        }
     }
 
     /**
@@ -777,4 +845,12 @@ public int getRetryTimesWhenSendAsyncFailed() {
     public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
         this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
     }
+
+    public AsyncDispatcher getTraceDispatcher() {
+        return traceDispatcher;
+    }
+
+    public void setTraceDispatcher(AsyncDispatcher traceDispatcher) {
+        this.traceDispatcher = traceDispatcher;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java
new file mode 100644
index 000000000..220a12811
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.common;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageType;
+
+public class TrackTraceBean {
+    private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
+    private String topic = "";
+    private String msgId = "";
+    private String offsetMsgId = "";
+    private String tags = "";
+    private String keys = "";
+    private String storeHost = LOCAL_ADDRESS;
+    private String clientHost = LOCAL_ADDRESS;
+    private long storeTime;
+    private int retryTimes;
+    private int bodyLength;
+    private MessageType msgType;
+
+
+    public MessageType getMsgType() {
+        return msgType;
+    }
+
+
+    public void setMsgType(final MessageType msgType) {
+        this.msgType = msgType;
+    }
+
+
+    public String getOffsetMsgId() {
+        return offsetMsgId;
+    }
+
+
+    public void setOffsetMsgId(final String offsetMsgId) {
+        this.offsetMsgId = offsetMsgId;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+
+    public String getTags() {
+        return tags;
+    }
+
+
+    public void setTags(String tags) {
+        this.tags = tags;
+    }
+
+
+    public String getKeys() {
+        return keys;
+    }
+
+
+    public void setKeys(String keys) {
+        this.keys = keys;
+    }
+
+
+    public String getStoreHost() {
+        return storeHost;
+    }
+
+
+    public void setStoreHost(String storeHost) {
+        this.storeHost = storeHost;
+    }
+
+
+    public String getClientHost() {
+        return clientHost;
+    }
+
+
+    public void setClientHost(String clientHost) {
+        this.clientHost = clientHost;
+    }
+
+
+    public long getStoreTime() {
+        return storeTime;
+    }
+
+
+    public void setStoreTime(long storeTime) {
+        this.storeTime = storeTime;
+    }
+
+
+    public int getRetryTimes() {
+        return retryTimes;
+    }
+
+
+    public void setRetryTimes(int retryTimes) {
+        this.retryTimes = retryTimes;
+    }
+
+
+    public int getBodyLength() {
+        return bodyLength;
+    }
+
+
+    public void setBodyLength(int bodyLength) {
+        this.bodyLength = bodyLength;
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
new file mode 100644
index 000000000..aa49d1c5b
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.common;
+
+import org.apache.rocketmq.common.MixAll;
+
+public class TrackTraceConstants {
+    public static final String NAMESRV_ADDR = "NAMESRV_ADDR";
+    public static final String ADDRSRV_URL = "ADDRSRV_URL";
+    public static final String INSTANCE_NAME = "InstanceName";
+    public static final String ASYNC_BUFFER_SIZE = "AsyncBufferSize";
+    public static final String MAX_BATCH_NUM = "MaxBatchNum";
+    public static final String WAKE_UP_NUM = "WakeUpNum";
+    public static final String MAX_MSG_SIZE = "MaxMsgSize";
+    public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
+    public static final String TRACE_TOPIC = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
+    public static final char CONTENT_SPLITOR = (char) 1;
+    public static final char FIELD_SPLITOR = (char) 2;
+    public static final String TRACE_DISPATCHER_TYPE = "DispatcherType";
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java
new file mode 100644
index 000000000..a6374a6d5
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.common;
+
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+
+import java.util.List;
+
+/**
+ * The context of Track Trace
+ */
+public class TrackTraceContext implements Comparable<TrackTraceContext> {
+
+    private TrackTraceType traceType;
+    private long timeStamp = System.currentTimeMillis();
+    private String regionId = "";
+    private String regionName = "";
+    private String groupName = "";
+    private int costTime = 0;
+    private boolean isSuccess = true;
+    private String requestId = MessageClientIDSetter.createUniqID();
+    private int contextCode = 0;
+    private List<TrackTraceBean> traceBeans;
+
+    public int getContextCode() {
+        return contextCode;
+    }
+
+    public void setContextCode(final int contextCode) {
+        this.contextCode = contextCode;
+    }
+
+    public List<TrackTraceBean> getTraceBeans() {
+        return traceBeans;
+    }
+
+    public void setTraceBeans(List<TrackTraceBean> traceBeans) {
+        this.traceBeans = traceBeans;
+    }
+
+    public String getRegionId() {
+        return regionId;
+    }
+
+    public void setRegionId(String regionId) {
+        this.regionId = regionId;
+    }
+
+    public TrackTraceType getTraceType() {
+        return traceType;
+    }
+
+    public void setTraceType(TrackTraceType traceType) {
+        this.traceType = traceType;
+    }
+
+    public long getTimeStamp() {
+        return timeStamp;
+    }
+
+    public void setTimeStamp(long timeStamp) {
+        this.timeStamp = timeStamp;
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+    public int getCostTime() {
+        return costTime;
+    }
+
+    public void setCostTime(int costTime) {
+        this.costTime = costTime;
+    }
+
+    public boolean isSuccess() {
+        return isSuccess;
+    }
+
+    public void setSuccess(boolean success) {
+        isSuccess = success;
+    }
+
+    public String getRequestId() {
+        return requestId;
+    }
+
+    public void setRequestId(String requestId) {
+        this.requestId = requestId;
+    }
+
+    public String getRegionName() {
+        return regionName;
+    }
+
+    public void setRegionName(String regionName) {
+        this.regionName = regionName;
+    }
+
+    @Override
+    public int compareTo(TrackTraceContext o) {
+        return (int) (this.timeStamp - o.getTimeStamp());
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder(1024);
+        sb.append(traceType).append("_").append(groupName)
+            .append("_").append(regionId).append("_").append(isSuccess).append("_");
+        if (traceBeans != null && traceBeans.size() > 0) {
+            for (TrackTraceBean bean : traceBeans) {
+                sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_");
+            }
+        }
+        return "TrackTraceContext{" + sb.toString() + '}';
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java
new file mode 100644
index 000000000..3362106e6
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.common;
+
+import org.apache.rocketmq.common.message.MessageType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.rocketmq.client.trace.core.common.TrackTraceType.Pub;
+
+/**
+ * encode/decode for Track Trace Data
+ */
+public class TrackTraceDataEncoder {
+
+    /**
+     * resolving traceContext list From track trace data String
+     *
+     * @param traceData
+     * @return
+     */
+    public static List<TrackTraceContext> decoderFromTraceDataString(String traceData) {
+        List<TrackTraceContext> resList = new ArrayList<TrackTraceContext>();
+        if (traceData == null || traceData.length() <= 0) {
+            return resList;
+        }
+        String[] contextList = traceData.split(String.valueOf(TrackTraceConstants.FIELD_SPLITOR));
+        for (String context : contextList) {
+            String[] line = context.split(String.valueOf(TrackTraceConstants.CONTENT_SPLITOR));
+            if (line[0].equals(Pub.name())) {
+                TrackTraceContext pubContext = new TrackTraceContext();
+                pubContext.setTraceType(Pub);
+                pubContext.setTimeStamp(Long.parseLong(line[1]));
+                pubContext.setRegionId(line[2]);
+                pubContext.setGroupName(line[3]);
+                TrackTraceBean bean = new TrackTraceBean();
+                bean.setTopic(line[4]);
+                bean.setMsgId(line[5]);
+                bean.setTags(line[6]);
+                bean.setKeys(line[7]);
+                bean.setStoreHost(line[8]);
+                bean.setBodyLength(Integer.parseInt(line[9]));
+                pubContext.setCostTime(Integer.parseInt(line[10]));
+                bean.setMsgType(MessageType.values()[Integer.parseInt(line[11])]);
+
+                if (line.length == 13) {
+                    pubContext.setSuccess(Boolean.parseBoolean(line[12]));
+                } else if (line.length == 14) {
+                    bean.setOffsetMsgId(line[12]);
+                    pubContext.setSuccess(Boolean.parseBoolean(line[13]));
+                }
+                pubContext.setTraceBeans(new ArrayList<TrackTraceBean>(1));
+                pubContext.getTraceBeans().add(bean);
+                resList.add(pubContext);
+            } else if (line[0].equals(TrackTraceType.SubBefore.name())) {
+                TrackTraceContext subBeforeContext = new TrackTraceContext();
+                subBeforeContext.setTraceType(TrackTraceType.SubBefore);
+                subBeforeContext.setTimeStamp(Long.parseLong(line[1]));
+                subBeforeContext.setRegionId(line[2]);
+                subBeforeContext.setGroupName(line[3]);
+                subBeforeContext.setRequestId(line[4]);
+                TrackTraceBean bean = new TrackTraceBean();
+                bean.setMsgId(line[5]);
+                bean.setRetryTimes(Integer.parseInt(line[6]));
+                bean.setKeys(line[7]);
+                subBeforeContext.setTraceBeans(new ArrayList<TrackTraceBean>(1));
+                subBeforeContext.getTraceBeans().add(bean);
+                resList.add(subBeforeContext);
+            } else if (line[0].equals(TrackTraceType.SubAfter.name())) {
+                TrackTraceContext subAfterContext = new TrackTraceContext();
+                subAfterContext.setTraceType(TrackTraceType.SubAfter);
+                subAfterContext.setRequestId(line[1]);
+                TrackTraceBean bean = new TrackTraceBean();
+                bean.setMsgId(line[2]);
+                bean.setKeys(line[5]);
+                subAfterContext.setTraceBeans(new ArrayList<TrackTraceBean>(1));
+                subAfterContext.getTraceBeans().add(bean);
+                subAfterContext.setCostTime(Integer.parseInt(line[3]));
+                subAfterContext.setSuccess(Boolean.parseBoolean(line[4]));
+                if (line.length >= 7) {
+                    // add the context type
+                    subAfterContext.setContextCode(Integer.parseInt(line[6]));
+                }
+                resList.add(subAfterContext);
+            }
+        }
+        return resList;
+    }
+
+    /**
+     * Encoding the trace context into track data strings and keyset sets
+     *
+     * @param ctx
+     * @return
+     */
+    public static TrackTraceTransferBean encoderFromContextBean(TrackTraceContext ctx) {
+        if (ctx == null) {
+            return null;
+        }
+        //build message track trace of the transfering entity content bean
+        TrackTraceTransferBean transferBean = new TrackTraceTransferBean();
+        StringBuilder sb = new StringBuilder(256);
+        switch (ctx.getTraceType()) {
+            case Pub: {
+                TrackTraceBean bean = ctx.getTraceBeans().get(0);
+                //append the content of context and traceBean to transferBean's TransData
+                sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(ctx.getTimeStamp()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(ctx.getRegionId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(ctx.getGroupName()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getTopic()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getTags()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getKeys()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getStoreHost()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getBodyLength()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(ctx.getCostTime()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getMsgType().ordinal()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(bean.getOffsetMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                    .append(ctx.isSuccess()).append(TrackTraceConstants.FIELD_SPLITOR);
+            }
+            break;
+            case SubBefore: {
+                for (TrackTraceBean bean : ctx.getTraceBeans()) {
+                    sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(ctx.getTimeStamp()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(ctx.getRegionId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(ctx.getGroupName()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(ctx.getRequestId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(bean.getRetryTimes()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(bean.getKeys()).append(TrackTraceConstants.FIELD_SPLITOR);//
+                }
+            }
+            break;
+            case SubAfter: {
+                for (TrackTraceBean bean : ctx.getTraceBeans()) {
+                    sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(ctx.getRequestId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(ctx.getCostTime()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(ctx.isSuccess()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(bean.getKeys()).append(TrackTraceConstants.CONTENT_SPLITOR)//
+                        .append(ctx.getContextCode()).append(TrackTraceConstants.FIELD_SPLITOR);
+                }
+            }
+            break;
+            default:
+        }
+        transferBean.setTransData(sb.toString());
+        for (TrackTraceBean bean : ctx.getTraceBeans()) {
+
+            transferBean.getTransKey().add(bean.getMsgId());
+            if (bean.getKeys() != null && bean.getKeys().length() > 0) {
+                transferBean.getTransKey().add(bean.getKeys());
+            }
+        }
+        return transferBean;
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java
new file mode 100644
index 000000000..a22198e81
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.common;
+
+public enum TrackTraceDispatcherType {
+    PRODUCER,
+    CONSUMER
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java
new file mode 100644
index 000000000..9535b5d47
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.common;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * track trace transfering bean
+ */
+public class TrackTraceTransferBean {
+    private String transData;
+    private Set<String> transKey = new HashSet<String>();
+
+    public String getTransData() {
+        return transData;
+    }
+
+    public void setTransData(String transData) {
+        this.transData = transData;
+    }
+
+    public Set<String> getTransKey() {
+        return transKey;
+    }
+
+    public void setTransKey(Set<String> transKey) {
+        this.transKey = transKey;
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java
new file mode 100644
index 000000000..72fa63055
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.common;
+
+public enum TrackTraceType {
+    Pub,
+    SubBefore,
+    SubAfter,
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java
new file mode 100644
index 000000000..9b282bb0b
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.dispatch;
+
+import java.util.Properties;
+import org.apache.rocketmq.client.exception.MQClientException;
+import java.io.IOException;
+
+/**
+ * Interface of asynchronous transfer data
+ */
+public interface AsyncDispatcher {
+
+    /**
+     * Initialize asynchronous transfer data module
+     */
+    void start(Properties properties) throws MQClientException;
+
+    /**
+     * append the transfering data
+     * @param ctx data infomation
+     * @return
+     */
+    boolean append(Object ctx);
+
+    /**
+     * write flush action
+     *
+     * @throws IOException
+     */
+    void flush() throws IOException;
+
+    /**
+     * close the track trace Hook
+     */
+    void shutdown();
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
new file mode 100644
index 000000000..1c1c4c381
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.dispatch.impl;
+
+import org.apache.rocketmq.client.common.ThreadLocalIndex;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceContext;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceDataEncoder;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceTransferBean;
+import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.Properties;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by zongtanghu on 2018/11/6.
+ */
+public class AsyncArrayDispatcher implements AsyncDispatcher {
+
+    private final static InternalLogger log = ClientLogger.getLog();
+    private final int queueSize;
+    private final int batchSize;
+    private final DefaultMQProducer traceProducer;
+    private final ThreadPoolExecutor traceExecuter;
+    // the last discard number of log
+    private AtomicLong discardCount;
+    private Thread worker;
+    private ArrayBlockingQueue<TrackTraceContext> traceContextQueue;
+    private ArrayBlockingQueue<Runnable> appenderQueue;
+    private volatile Thread shutDownHook;
+    private volatile boolean stopped = false;
+    private String dispatcherType;
+    private DefaultMQProducerImpl hostProducer;
+    private DefaultMQPushConsumerImpl hostConsumer;
+    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
+    private String dispatcherId = UUID.randomUUID().toString();
+
+    public AsyncArrayDispatcher(Properties properties) throws MQClientException {
+        dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE);
+        int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"));
+        // queueSize is greater than or equal to the n power of 2 of value
+        queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
+        this.queueSize = queueSize;
+        batchSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.MAX_BATCH_NUM, "1"));
+        this.discardCount = new AtomicLong(0L);
+        traceContextQueue = new ArrayBlockingQueue<TrackTraceContext>(1024);
+        appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
+
+        this.traceExecuter = new ThreadPoolExecutor(//
+            10, //
+            20, //
+            1000 * 60, //
+            TimeUnit.MILLISECONDS, //
+            this.appenderQueue, //
+            new ThreadFactoryImpl("MQTraceSendThread_"));
+        traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties);
+    }
+
+    public DefaultMQProducer getTraceProducer() {
+        return traceProducer;
+    }
+    
+    public DefaultMQProducerImpl getHostProducer() {
+        return hostProducer;
+    }
+
+    public void setHostProducer(DefaultMQProducerImpl hostProducer) {
+        this.hostProducer = hostProducer;
+    }
+
+    public DefaultMQPushConsumerImpl getHostConsumer() {
+        return hostConsumer;
+    }
+
+    public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
+        this.hostConsumer = hostConsumer;
+    }
+
+    public void start(Properties properties) throws MQClientException {
+        TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId,properties.getProperty(TrackTraceConstants.NAMESRV_ADDR));
+        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId);
+        this.worker.setDaemon(true);
+        this.worker.start();
+        this.registerShutDownHook();
+    }
+
+    @Override
+    public boolean append(final Object ctx) {
+        boolean result = traceContextQueue.offer((TrackTraceContext) ctx);
+        if (!result) {
+            log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
+        }
+        return result;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        // the maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
+        long end = System.currentTimeMillis() + 500;
+        while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {
+                break;
+            }
+        }
+        log.info("------end trace send " + traceContextQueue.size() + "   " + appenderQueue.size());
+    }
+
+    @Override
+    public void shutdown() {
+        this.stopped = true;
+        this.traceExecuter.shutdown();
+        TrackTraceProducerFactory.unregisterTraceDispatcher(dispatcherId);
+        this.removeShutdownHook();
+    }
+
+    public void registerShutDownHook() {
+        if (shutDownHook == null) {
+            shutDownHook = new Thread(new Runnable() {
+                private volatile boolean hasShutdown = false;
+
+                @Override
+                public void run() {
+                    synchronized (this) {
+                        if (!this.hasShutdown) {
+                            try {
+                                flush();
+                            } catch (IOException e) {
+                                log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");
+                            }
+                        }
+                    }
+                }
+            }, "ShutdownHookMQTrace");
+            Runtime.getRuntime().addShutdownHook(shutDownHook);
+        }
+    }
+
+    public void removeShutdownHook() {
+        if (shutDownHook != null) {
+            Runtime.getRuntime().removeShutdownHook(shutDownHook);
+        }
+    }
+
+    class AsyncRunnable implements Runnable {
+        private boolean stopped;
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                List<TrackTraceContext> contexts = new ArrayList<TrackTraceContext>(batchSize);
+                for (int i = 0; i < batchSize; i++) {
+                    TrackTraceContext context = null;
+                    try {
+                        //get track trace data element from blocking Queue — traceContextQueue
+                        context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                    }
+                    if (context != null) {
+                        contexts.add(context);
+                    } else {
+                        break;
+                    }
+                }
+                if (contexts.size() > 0) {
+                    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
+                    traceExecuter.submit(request);
+                } else if (AsyncArrayDispatcher.this.stopped) {
+                    this.stopped = true;
+                }
+            }
+
+        }
+    }
+
+    class AsyncAppenderRequest implements Runnable {
+        List<TrackTraceContext> contextList;
+
+        public AsyncAppenderRequest(final List<TrackTraceContext> contextList) {
+            if (contextList != null) {
+                this.contextList = contextList;
+            } else {
+                this.contextList = new ArrayList<TrackTraceContext>(1);
+            }
+        }
+
+        @Override
+        public void run() {
+            sendTraceData(contextList);
+        }
+        
+        public void sendTraceData(List<TrackTraceContext> contextList) {
+            Map<String, List<TrackTraceTransferBean>> transBeanMap = new HashMap<String, List<TrackTraceTransferBean>>();
+            for (TrackTraceContext context : contextList) {
+                if (context.getTraceBeans().isEmpty()) {
+                    continue;
+                }
+                //1.topic value corresponding to original message entity content
+                String topic = context.getTraceBeans().get(0).getTopic();
+                //2.use  original message entity's topic as key
+                String key = topic;
+                List<TrackTraceTransferBean> transBeanList = transBeanMap.get(key);
+                if (transBeanList == null) {
+                    transBeanList = new ArrayList<TrackTraceTransferBean>();
+                    transBeanMap.put(key, transBeanList);
+                }
+                TrackTraceTransferBean traceData = TrackTraceDataEncoder.encoderFromContextBean(context);
+                transBeanList.add(traceData);
+            }
+            for (Map.Entry<String, List<TrackTraceTransferBean>> entry : transBeanMap.entrySet()) {
+                //key -> dataTopic(Not trace Topic)
+                String dataTopic =  entry.getKey();
+                flushData(entry.getValue(), dataTopic);
+            }
+        }
+
+        /**
+         * batch sending data actually
+         */
+        private void flushData(List<TrackTraceTransferBean> transBeanList, String topic) {
+            if (transBeanList.size() == 0) {
+                return;
+            }
+            // temporary buffer
+            StringBuilder buffer = new StringBuilder(1024);
+            int count = 0;
+            Set<String> keySet = new HashSet<String>();
+
+            for (TrackTraceTransferBean bean : transBeanList) {
+                // keyset of message track trace includes msgId of or original message
+                keySet.addAll(bean.getTransKey());
+                buffer.append(bean.getTransData());
+                count++;
+                // Ensure that the size of the package should not exceed the upper limit.
+                if (buffer.length() >= traceProducer.getMaxMessageSize()) {
+                    sendTraceDataByMQ(keySet, buffer.toString());
+                    // clear temporary buffer after finishing
+                    buffer.delete(0, buffer.length());
+                    keySet.clear();
+                    count = 0;
+                }
+            }
+            if (count > 0) {
+                sendTraceDataByMQ(keySet, buffer.toString());
+            }
+            transBeanList.clear();
+        }
+
+        /**
+         * send message track trace data
+         *
+         * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
+         * @param data   the message track trace data in this batch
+         */
+        private void sendTraceDataByMQ(Set<String> keySet, final String data) {
+            String topic = TrackTraceConstants.TRACE_TOPIC;
+            final Message message = new Message(topic, data.getBytes());
+
+            //keyset of message track trace includes msgId of or original message
+            message.setKeys(keySet);
+            try {
+                Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
+                SendCallback callback = new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                    }
+
+                    @Override
+                    public void onException(Throwable e) {
+                        log.info("send trace data ,the traceData is " + data);
+                    }
+                };
+                if (traceBrokerSet.isEmpty()) {
+                    //no cross set
+                    traceProducer.send(message, callback, 5000);
+                } else {
+                    traceProducer.send(message, new MessageQueueSelector() {
+                        @Override
+                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                            Set<String> brokerSet = (Set<String>) arg;
+                            List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
+                            for (MessageQueue queue : mqs) {
+                                if (brokerSet.contains(queue.getBrokerName())) {
+                                    filterMqs.add(queue);
+                                }
+                            }
+                            int index = sendWhichQueue.getAndIncrement();
+                            int pos = Math.abs(index) % filterMqs.size();
+                            if (pos < 0) {
+                                pos = 0;
+                            }
+                            return filterMqs.get(pos);
+                        }
+                    }, traceBrokerSet, callback);
+                }
+
+            } catch (Exception e) {
+                log.info("send trace data,the traceData is" + data);
+            }
+        }
+
+        private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
+            Set<String> brokerSet = new HashSet<String>();
+            TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
+            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
+                producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
+                producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+                topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
+            }
+            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
+                for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
+                    brokerSet.add(queue.getBrokerName());
+                }
+            }
+            return brokerSet;
+        }
+    }
+
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java
new file mode 100644
index 000000000..27447df73
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.dispatch.impl;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
+import org.apache.rocketmq.common.namesrv.TopAddressing;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TrackTraceProducerFactory {
+
+    private static Map<String, Object> dispatcherTable = new ConcurrentHashMap<String, Object>();
+    private static AtomicBoolean isStarted = new AtomicBoolean(false);
+    private static DefaultMQProducer traceProducer;
+
+
+    public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) {
+        if (traceProducer == null) {
+
+            traceProducer = new DefaultMQProducer();
+            traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME);
+            traceProducer.setSendMsgTimeout(5000);
+            traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis())));
+            String nameSrv = properties.getProperty(TrackTraceConstants.NAMESRV_ADDR);
+            if (nameSrv == null) {
+                TopAddressing topAddressing = new TopAddressing(properties.getProperty(TrackTraceConstants.ADDRSRV_URL));
+                nameSrv = topAddressing.fetchNSAddr();
+            }
+            traceProducer.setNamesrvAddr(nameSrv);
+            traceProducer.setVipChannelEnabled(false);
+            //the max size of message is 128K
+            int maxSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.MAX_MSG_SIZE, "128000"));
+            traceProducer.setMaxMessageSize(maxSize - 10 * 1000);
+        }
+        return traceProducer;
+    }
+
+    public static void registerTraceDispatcher(String dispatcherId, String nameSrvAddr) throws MQClientException {
+        dispatcherTable.put(dispatcherId, new Object());
+        if (traceProducer != null && isStarted.compareAndSet(false, true)) {
+            traceProducer.setNamesrvAddr(nameSrvAddr);
+            traceProducer.start();
+        }
+    }
+
+    public static void unregisterTraceDispatcher(String dispatcherId) {
+        dispatcherTable.remove(dispatcherId);
+        if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.get()) {
+            traceProducer.shutdown();
+        }
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java
new file mode 100644
index 000000000..df88ce2d5
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.hook;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceBean;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceContext;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceType;
+import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
+
+    private AsyncDispatcher localDispatcher;
+
+    public ConsumeMessageTraceHookImpl(AsyncDispatcher localDispatcher) {
+        this.localDispatcher = localDispatcher;
+    }
+
+    @Override
+    public String hookName() {
+        return "ConsumeMessageTraceHook";
+    }
+
+    @Override
+    public void consumeMessageBefore(ConsumeMessageContext context) {
+        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
+            return;
+        }
+        TrackTraceContext traceContext = new TrackTraceContext();
+        context.setMqTraceContext(traceContext);
+        traceContext.setTraceType(TrackTraceType.SubBefore);//
+        traceContext.setGroupName(context.getConsumerGroup());//
+        List<TrackTraceBean> beans = new ArrayList<TrackTraceBean>();
+        for (MessageExt msg : context.getMsgList()) {
+            if (msg == null) {
+                continue;
+            }
+            String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
+            String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
+
+            if (traceOn != null && traceOn.equals("false")) {
+                // if trace switch is false ,skip it
+                continue;
+            }
+            TrackTraceBean traceBean = new TrackTraceBean();
+            traceBean.setTopic(msg.getTopic());//
+            traceBean.setMsgId(msg.getMsgId());//
+            traceBean.setTags(msg.getTags());//
+            traceBean.setKeys(msg.getKeys());//
+            traceBean.setStoreTime(msg.getStoreTimestamp());//
+            traceBean.setBodyLength(msg.getStoreSize());//
+            traceBean.setRetryTimes(msg.getReconsumeTimes());//
+            traceContext.setRegionId(regionId);//
+            beans.add(traceBean);
+        }
+        if (beans.size() > 0) {
+            traceContext.setTraceBeans(beans);
+            traceContext.setTimeStamp(System.currentTimeMillis());
+            localDispatcher.append(traceContext);
+        }
+    }
+
+    @Override
+    public void consumeMessageAfter(ConsumeMessageContext context) {
+        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
+            return;
+        }
+        TrackTraceContext subBeforeContext = (TrackTraceContext) context.getMqTraceContext();
+
+        if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
+            // if subbefore bean is null ,skip it
+            return;
+        }
+        TrackTraceContext subAfterContext = new TrackTraceContext();
+        subAfterContext.setTraceType(TrackTraceType.SubAfter);//
+        subAfterContext.setRegionId(subBeforeContext.getRegionId());//
+        subAfterContext.setGroupName(subBeforeContext.getGroupName());//
+        subAfterContext.setRequestId(subBeforeContext.getRequestId());//
+        subAfterContext.setSuccess(context.isSuccess());//
+
+        //caculate the cost time for processing messages
+        int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
+        subAfterContext.setCostTime(costTime);//
+        subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
+        String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
+        if (contextType != null) {
+            subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
+        }
+        localDispatcher.append(subAfterContext);
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
new file mode 100644
index 000000000..0c38e2203
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.core.hook;
+
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceBean;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceContext;
+import org.apache.rocketmq.client.trace.core.common.TrackTraceType;
+import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
+import org.apache.rocketmq.common.MixAll;
+import java.util.ArrayList;
+
+public class SendMessageTrackHookImpl implements SendMessageHook {
+
+    private AsyncDispatcher localDispatcher;
+
+    public SendMessageTrackHookImpl(AsyncDispatcher localDispatcher) {
+        this.localDispatcher = localDispatcher;
+    }
+
+    @Override
+    public String hookName() {
+        return "SendMessageTrackHook";
+    }
+
+    @Override
+    public void sendMessageBefore(SendMessageContext context) {
+        //if it is message track trace data,then it doesn't recorded
+        if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) {
+            return;
+        }
+        //build the context content of TuxeTraceContext
+        TrackTraceContext tuxeContext = new TrackTraceContext();
+        tuxeContext.setTraceBeans(new ArrayList<TrackTraceBean>(1));
+        context.setMqTraceContext(tuxeContext);
+        tuxeContext.setTraceType(TrackTraceType.Pub);
+        tuxeContext.setGroupName(context.getProducerGroup());
+
+        //build the data bean object of message track trace
+        TrackTraceBean traceBean = new TrackTraceBean();
+        traceBean.setTopic(context.getMessage().getTopic());
+        traceBean.setTags(context.getMessage().getTags());
+        traceBean.setKeys(context.getMessage().getKeys());
+        traceBean.setStoreHost(context.getBrokerAddr());
+        traceBean.setBodyLength(context.getMessage().getBody().length);
+        traceBean.setMsgType(context.getMsgType());
+        tuxeContext.getTraceBeans().add(traceBean);
+    }
+
+    @Override
+    public void sendMessageAfter(SendMessageContext context) {
+        //if it is message track trace data,then it doesn't recorded
+        if (context == null || context.getMessage().getTopic().startsWith(TrackTraceConstants.TRACE_TOPIC)
+            || context.getMqTraceContext() == null) {
+            return;
+        }
+        if (context.getSendResult() == null) {
+            return;
+        }
+
+        if (context.getSendResult().getRegionId() == null
+            || !context.getSendResult().isTraceOn()) {
+            // if switch is false,skip it
+            return;
+        }
+
+        TrackTraceContext tuxeContext = (TrackTraceContext) context.getMqTraceContext();
+        TrackTraceBean traceBean = tuxeContext.getTraceBeans().get(0);
+        int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
+        tuxeContext.setCostTime(costTime);
+        if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
+            tuxeContext.setSuccess(true);
+        } else {
+            tuxeContext.setSuccess(false);
+        }
+        tuxeContext.setRegionId(context.getSendResult().getRegionId());
+        traceBean.setMsgId(context.getSendResult().getMsgId());
+        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
+        traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
+        localDispatcher.append(tuxeContext);
+    }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
new file mode 100644
index 000000000..27c10dad5
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
+import org.apache.rocketmq.client.impl.consumer.PullMessageService;
+import org.apache.rocketmq.client.impl.consumer.PullRequest;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQConsumerWithTraceTest {
+    private String consumerGroup;
+    private String consumerGroupNormal;
+    private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis();
+
+    private String topic = "FooBar";
+    private String brokerName = "BrokerA";
+    private MQClientInstance mQClientFactory;
+
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+    private PullAPIWrapper pullAPIWrapper;
+    private RebalancePushImpl rebalancePushImpl;
+    private DefaultMQPushConsumer pushConsumer;
+    private DefaultMQPushConsumer normalPushConsumer;
+
+    private AsyncArrayDispatcher asyncArrayDispatcher;
+    private MQClientInstance mQClientTraceFactory;
+    @Mock
+    private MQClientAPIImpl mQClientTraceAPIImpl;
+    private DefaultMQProducer traceProducer;
+
+
+    @Before
+    public void init() throws Exception {
+        consumerGroup = "FooBarGroup" + System.currentTimeMillis();
+        pushConsumer = new DefaultMQPushConsumer(consumerGroup,true);
+        consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
+        normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false);
+
+        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
+        pushConsumer.setPullInterval(60 * 1000);
+
+        asyncArrayDispatcher = (AsyncArrayDispatcher)pushConsumer.getTraceDispatcher();
+        traceProducer = asyncArrayDispatcher.getTraceProducer();
+
+
+        pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                ConsumeConcurrentlyContext context) {
+                return null;
+            }
+        });
+
+        DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
+        rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
+        Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, rebalancePushImpl);
+        pushConsumer.subscribe(topic, "*");
+        pushConsumer.start();
+
+        mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
+        mQClientTraceFactory = spy(pushConsumerImpl.getmQClientFactory());
+
+        field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+        fieldTrace.setAccessible(true);
+        fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory);
+
+        fieldTrace = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        fieldTrace.setAccessible(true);
+        fieldTrace.set(mQClientTraceFactory, mQClientTraceAPIImpl);
+
+        pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
+        field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
+        field.setAccessible(true);
+        field.set(pushConsumerImpl, pullAPIWrapper);
+
+        pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
+        mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
+
+        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+            anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+            .thenAnswer(new Answer<Object>() {
+                @Override
+                public Object answer(InvocationOnMock mock) throws Throwable {
+                    PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                    MessageClientExt messageClientExt = new MessageClientExt();
+                    messageClientExt.setTopic(topic);
+                    messageClientExt.setQueueId(0);
+                    messageClientExt.setMsgId("123");
+                    messageClientExt.setBody(new byte[] {'a'});
+                    messageClientExt.setOffsetMsgId("234");
+                    messageClientExt.setBornHost(new InetSocketAddress(8080));
+                    messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                    PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+                    ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+                    return pullResult;
+                }
+            });
+
+        doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
+        Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+        messageQueueSet.add(createPullRequest().getMessageQueue());
+        pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
+    }
+
+    @After
+    public void terminate() {
+        pushConsumer.shutdown();
+    }
+
+    @Test
+    public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
+        traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
+        //when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        //when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
+
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        final MessageExt[] messageExts = new MessageExt[1];
+        pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                ConsumeConcurrentlyContext context) {
+                messageExts[0] = msgs.get(0);
+                countDownLatch.countDown();
+                return null;
+            }
+        }));
+
+        PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
+        pullMessageService.executePullRequestImmediately(createPullRequest());
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        assertThat(messageExts[0].getTopic()).isEqualTo(topic);
+        assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+    }
+
+    private PullRequest createPullRequest() {
+        PullRequest pullRequest = new PullRequest();
+        pullRequest.setConsumerGroup(consumerGroup);
+        pullRequest.setNextOffset(1024);
+
+        MessageQueue messageQueue = new MessageQueue();
+        messageQueue.setBrokerName(brokerName);
+        messageQueue.setQueueId(0);
+        messageQueue.setTopic(topic);
+        pullRequest.setMessageQueue(messageQueue);
+        ProcessQueue processQueue = new ProcessQueue();
+        processQueue.setLocked(true);
+        processQueue.setLastLockTimestamp(System.currentTimeMillis());
+        pullRequest.setProcessQueue(processQueue);
+
+        return pullRequest;
+    }
+
+    private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
+        List<MessageExt> messageExtList) throws Exception {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        for (MessageExt messageExt : messageExtList) {
+            outputStream.write(MessageDecoder.encode(messageExt, false));
+        }
+        return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
+    }
+
+    public static TopicRouteData createTopicRoute() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("BrokerA");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+        brokerAddrs.put(0L, "127.0.0.1:10911");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("BrokerA");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSynFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+        return topicRouteData;
+    }
+
+    private SendResult createSendResult(SendStatus sendStatus) {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("123");
+        sendResult.setOffsetMsgId("123");
+        sendResult.setQueueOffset(456);
+        sendResult.setSendStatus(sendStatus);
+        sendResult.setRegionId("HZ");
+        return sendResult;
+    }
+
+    public static TopicRouteData createTraceTopicRoute() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("broker-trace");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+        brokerAddrs.put(0L, "127.0.0.1:10912");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("broker-trace");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(1);
+        queueData.setWriteQueueNums(1);
+        queueData.setTopicSynFlag(1);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+        return topicRouteData;
+    }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
new file mode 100644
index 000000000..c3757a8af
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQProducerWithTraceTest {
+
+    @Spy
+    private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+
+    @Spy
+    private MQClientInstance mQClientTraceFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    @Mock
+    private MQClientAPIImpl mQClientTraceAPIImpl;
+
+    private AsyncArrayDispatcher asyncArrayDispatcher;
+
+    private DefaultMQProducer producer;
+    private DefaultMQProducer traceProducer;
+    private DefaultMQProducer normalProducer;
+
+    private Message message;
+    private String topic = "FooBar";
+    private String producerGroupPrefix = "FooBar_PID";
+    private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
+    private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis();
+    
+    @Before
+    public void init() throws Exception {
+
+        normalProducer = new DefaultMQProducer(producerGroupTemp,false);
+        producer = new DefaultMQProducer(producerGroupTemp,true);
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        normalProducer.setNamesrvAddr("127.0.0.1:9877");
+        message = new Message(topic, new byte[] {'a', 'b' ,'c'});
+        asyncArrayDispatcher = (AsyncArrayDispatcher)producer.getTraceDispatcher();
+        traceProducer = asyncArrayDispatcher.getTraceProducer();
+
+        producer.start();
+        
+        Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
+
+        Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+        fieldTrace.setAccessible(true);
+        fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientTraceFactory, mQClientTraceAPIImpl);
+
+        producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
+
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenReturn(createSendResult(SendStatus.SEND_OK));
+        
+    }
+
+    @Test
+    public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        try {
+            producer.send(message);
+        }catch (MQClientException e){
+        }
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+
+    }
+
+    @Test
+    public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        try {
+            producer.send(message);
+        }catch (MQClientException e){
+        }
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+
+    }
+
+    @After
+    public void terminate() {
+        producer.shutdown();
+    }
+
+    public static TopicRouteData createTopicRoute() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("BrokerA");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+        brokerAddrs.put(0L, "127.0.0.1:10911");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("BrokerA");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSynFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+        return topicRouteData;
+    }
+
+    private SendResult createSendResult(SendStatus sendStatus) {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("123");
+        sendResult.setOffsetMsgId("123");
+        sendResult.setQueueOffset(456);
+        sendResult.setSendStatus(sendStatus);
+        sendResult.setRegionId("HZ");
+        return sendResult;
+    }
+
+    public static TopicRouteData createTraceTopicRoute() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("broker-trace");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+        brokerAddrs.put(0L, "127.0.0.1:10912");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("broker-trace");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(1);
+        queueData.setWriteQueueNums(1);
+        queueData.setTopicSynFlag(1);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+        return topicRouteData;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f81af2165..77d492ecb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -51,7 +51,8 @@
     @ImportantField
     private boolean autoCreateSubscriptionGroup = true;
     private String messageStorePlugIn = "";
-
+    @ImportantField
+    private boolean autoTraceBrokerEnable = false;
     /**
      * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
      * value is 1.
@@ -732,4 +733,12 @@ public long getWaitTimeMillsInTransactionQueue() {
     public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
         this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
     }
+
+    public boolean isAutoTraceBrokerEnable() {
+        return autoTraceBrokerEnable;
+    }
+
+    public void setAutoTraceBrokerEnable(boolean autoTraceBrokerEnable) {
+        this.autoTraceBrokerEnable = autoTraceBrokerEnable;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 20d186764..0573c762a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -82,7 +82,8 @@
     public static final long CURRENT_JVM_PID = getPID();
 
     public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
-
+    public static final String TRACE_BROKER_NAME_SUFFIX = "trace";
+    
     public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
     public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
     public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
@@ -90,6 +91,7 @@
     public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
 
     public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
+    public static final String RMQ_SYS_TRACK_TRACE_TOPIC = "RMQ_SYS_TRACK_TRACE_TOPIC";
     public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
     public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
 
diff --git a/distribution/conf/2m-noslave/broker-a.properties b/distribution/conf/2m-noslave/broker-a.properties
index b704b54c5..cd051cdc1 100644
--- a/distribution/conf/2m-noslave/broker-a.properties
+++ b/distribution/conf/2m-noslave/broker-a.properties
@@ -19,3 +19,4 @@ deleteWhen=04
 fileReservedTime=48
 brokerRole=ASYNC_MASTER
 flushDiskType=ASYNC_FLUSH
+autoTraceBrokerEnable=false
\ No newline at end of file
diff --git a/distribution/conf/2m-noslave/broker-b.properties b/distribution/conf/2m-noslave/broker-b.properties
index 130671a7c..da00cd419 100644
--- a/distribution/conf/2m-noslave/broker-b.properties
+++ b/distribution/conf/2m-noslave/broker-b.properties
@@ -19,3 +19,4 @@ deleteWhen=04
 fileReservedTime=48
 brokerRole=ASYNC_MASTER
 flushDiskType=ASYNC_FLUSH
+autoTraceBrokerEnable=false
\ No newline at end of file
diff --git a/distribution/conf/2m-noslave/broker-trace.properties b/distribution/conf/2m-noslave/broker-trace.properties
new file mode 100644
index 000000000..a4898aae0
--- /dev/null
+++ b/distribution/conf/2m-noslave/broker-trace.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+brokerClusterName=DefaultCluster
+brokerName=broker-trace
+brokerId=0
+deleteWhen=04
+fileReservedTime=48
+brokerRole=ASYNC_MASTER
+flushDiskType=ASYNC_FLUSH
+autoTraceBrokerEnable=true
\ No newline at end of file
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
index 7b504dd2a..448f8ee9f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
@@ -26,7 +26,6 @@
     public static void main(String[] args) throws MQClientException, InterruptedException {
 
         DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
-
         producer.start();
 
         for (int i = 0; i < 128; i++)
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
index c6c7e39d1..abbfbdffc 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
@@ -29,10 +29,10 @@
 
     public static void main(String[] args) throws InterruptedException, MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
-        consumer.subscribe("Jodie_topic_1023", "*");
+        consumer.subscribe("TopicTest", "*");
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         //wrong time format 2017_0422_221800
-        consumer.setConsumeTimestamp("20170422221800");
+        consumer.setConsumeTimestamp("20181109221800");
         consumer.registerMessageListener(new MessageListenerConcurrently() {
 
             @Override
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
new file mode 100644
index 000000000..fb8e37fd2
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.tracemessage;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class TraceProducer {
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+
+        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
+        producer.start();
+
+        for (int i = 0; i < 128; i++)
+            try {
+                {
+                    Message msg = new Message("TopicTest",
+                        "TagA",
+                        "OrderID188",
+                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+                    SendResult sendResult = producer.send(msg);
+                    System.out.printf("%s%n", sendResult);
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        producer.shutdown();
+    }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
new file mode 100644
index 000000000..b9710d4a7
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.tracemessage;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class TracePushConsumer {
+    public static void main(String[] args) throws InterruptedException, MQClientException {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
+        consumer.subscribe("TopicTest", "*");
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        //wrong time format 2017_0422_221800
+        consumer.setConsumeTimestamp("20181109221800");
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();
+        System.out.printf("Consumer Started.%n");
+    }
+}
diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
index d2adac53d..7ba580df9 100644
--- a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
+++ b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
@@ -63,7 +63,7 @@ public MQProducer getInstance(String nameServerAddress, String group) throws MQC
             return p;
         }
 
-        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
+        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group,false);
         defaultMQProducer.setNamesrvAddr(nameServerAddress);
         MQProducer beforeProducer = null;
         beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
index 38904c0bf..128939385 100644
--- a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
+++ b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
@@ -39,7 +39,7 @@
 
     @Before
     public void mockLoggerAppender() throws Exception {
-        DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender"));
+        DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender",false));
         doAnswer(new Answer<Void>() {
             @Override
             public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
index ce739be59..a828e8ddf 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
@@ -46,7 +46,7 @@ public void create() {
     }
 
     public void create(boolean useTLS) {
-        consumer = new DefaultMQPushConsumer(consumerGroup);
+        consumer = new DefaultMQPushConsumer(consumerGroup,false);
         consumer.setInstanceName(RandomUtil.getStringByUUID());
         consumer.setNamesrvAddr(nsAddr);
         try {
diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
index 66767cc9f..abd1e3b64 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
@@ -24,7 +24,7 @@
 public class ProducerFactory {
 
     public static DefaultMQProducer getRMQProducer(String ns) {
-        DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID());
+        DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID(),false);
         producer.setNamesrvAddr(ns);
         try {
             producer.start();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index 6bb8caad4..675fc2a81 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -213,7 +213,7 @@ public Options buildCommandlineOptions(Options options) {
     public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
         defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
-        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById");
+        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById",false);
         defaultMQProducer.setInstanceName(Long.toString(System.currentTimeMillis()));
 
         try {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
index 9bf09ad41..3debb3d15 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
@@ -65,7 +65,7 @@
     private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
         MixAll.TOOLS_CONSUMER_GROUP);
     private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(
-        MixAll.MONITOR_CONSUMER_GROUP);
+        MixAll.MONITOR_CONSUMER_GROUP,false);
 
     public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook rpcHook) {
         this.monitorConfig = monitorConfig;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services