You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/19 11:54:05 UTC

[GitHub] dongeforever closed pull request #600: [ISSUE #525] Support the message track, add the function which supports trace topic name value configurable by users.

dongeforever closed pull request #600: [ISSUE #525] Support the message track,add the function which supports trace topic name value configurable by users.
URL: https://github.com/apache/rocketmq/pull/600
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 163897b1c..bd5eaeeab 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -126,7 +126,7 @@ public TopicConfigManager(BrokerController brokerController) {
         }
         {
             if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
-                String topic = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
+                String topic = this.brokerController.getBrokerConfig().getMsgTrackTopicName();
                 TopicConfig topicConfig = new TopicConfig(topic);
                 this.systemTopicList.add(topic);
                 topicConfig.setReadQueueNums(1);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 0b5ce0521..179a80daa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -289,9 +289,10 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
      * @param rpcHook RPC hook to execute before each remoting command.
      * @param allocateMessageQueueStrategy message queue allocating algorithm.
      * @param msgTraceSwitch switch flag instance for message track trace.
+     * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name.
      */
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
-        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch) {
+        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch, final String traceTopicName) {
         this.consumerGroup = consumerGroup;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
@@ -303,6 +304,11 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
                 tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
                 tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
                 tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.CONSUMER.name());
+                if (!UtilAll.isBlank(traceTopicName)) {
+                    tempProperties.put(TrackTraceConstants.TRACE_TOPIC, traceTopicName);
+                } else {
+                    tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
+                }
                 AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
                 dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
                 traceDispatcher = dispatcher;
@@ -329,9 +335,10 @@ public DefaultMQPushConsumer(RPCHook rpcHook) {
      *
      * @param consumerGroup Consumer group.
      * @param msgTraceSwitch switch flag instance for message track trace.
+     * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name.
      */
-    public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch) {
-        this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch);
+    public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch, final String traceTopicName) {
+        this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch, traceTopicName);
     }
 
     /**
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 8bee795d0..3c33d2eed 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -33,6 +33,7 @@
 import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
 import org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -138,7 +139,7 @@
      * Default constructor.
      */
     public DefaultMQProducer() {
-        this(MixAll.DEFAULT_PRODUCER_GROUP, null,false);
+        this(MixAll.DEFAULT_PRODUCER_GROUP, null);
     }
 
     /**
@@ -158,8 +159,9 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
      * @param producerGroup Producer group, see the name-sake field.
      * @param rpcHook RPC hook to execute per each remoting command execution.
      * @param msgTraceSwitch switch flag instance for message track trace.
+     * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name.
      */
-    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch) {
+    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch,final String traceTopicName) {
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
         //if client open the message track trace feature
@@ -171,6 +173,11 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean ms
                 tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
                 tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
                 tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.PRODUCER.name());
+                if (!UtilAll.isBlank(traceTopicName)) {
+                    tempProperties.put(TrackTraceConstants.TRACE_TOPIC, traceTopicName);
+                } else {
+                    tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
+                }
                 AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
                 dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
                 traceDispatcher = dispatcher;
@@ -197,9 +204,10 @@ public DefaultMQProducer(final String producerGroup) {
      *
      * @param producerGroup Producer group, see the name-sake field.
      * @param msgTraceSwitch switch flag instance for message track trace.
+     * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name.
      */
-    public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch) {
-        this(producerGroup, null, msgTraceSwitch);
+    public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch, final String traceTopicName) {
+        this(producerGroup, null, msgTraceSwitch, traceTopicName);
     }
 
     /**
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
index aa49d1c5b..a8868614b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.client.trace.core.common;
 
-import org.apache.rocketmq.common.MixAll;
-
 public class TrackTraceConstants {
     public static final String NAMESRV_ADDR = "NAMESRV_ADDR";
     public static final String ADDRSRV_URL = "ADDRSRV_URL";
@@ -27,7 +25,7 @@
     public static final String WAKE_UP_NUM = "WakeUpNum";
     public static final String MAX_MSG_SIZE = "MaxMsgSize";
     public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
-    public static final String TRACE_TOPIC = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
+    public static final String TRACE_TOPIC = "TRACK_TRACE_TOPIC_NAME";
     public static final char CONTENT_SPLITOR = (char) 1;
     public static final char FIELD_SPLITOR = (char) 2;
     public static final String TRACE_DISPATCHER_TYPE = "DispatcherType";
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
index 1c1c4c381..90b00d414 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
@@ -72,6 +72,7 @@
     private DefaultMQPushConsumerImpl hostConsumer;
     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
     private String dispatcherId = UUID.randomUUID().toString();
+    private String traceTopicName;
 
     public AsyncArrayDispatcher(Properties properties) throws MQClientException {
         dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE);
@@ -83,7 +84,7 @@ public AsyncArrayDispatcher(Properties properties) throws MQClientException {
         this.discardCount = new AtomicLong(0L);
         traceContextQueue = new ArrayBlockingQueue<TrackTraceContext>(1024);
         appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
-
+        traceTopicName = properties.getProperty(TrackTraceConstants.TRACE_TOPIC);
         this.traceExecuter = new ThreadPoolExecutor(//
             10, //
             20, //
@@ -94,6 +95,14 @@ public AsyncArrayDispatcher(Properties properties) throws MQClientException {
         traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties);
     }
 
+    public String getTraceTopicName() {
+        return traceTopicName;
+    }
+
+    public void setTraceTopicName(String traceTopicName) {
+        this.traceTopicName = traceTopicName;
+    }
+    
     public DefaultMQProducer getTraceProducer() {
         return traceProducer;
     }
@@ -115,7 +124,7 @@ public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
     }
 
     public void start(Properties properties) throws MQClientException {
-        TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId,properties.getProperty(TrackTraceConstants.NAMESRV_ADDR));
+        TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId, properties.getProperty(TrackTraceConstants.NAMESRV_ADDR));
         this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId);
         this.worker.setDaemon(true);
         this.worker.start();
@@ -247,16 +256,14 @@ public void sendTraceData(List<TrackTraceContext> contextList) {
                 transBeanList.add(traceData);
             }
             for (Map.Entry<String, List<TrackTraceTransferBean>> entry : transBeanMap.entrySet()) {
-                //key -> dataTopic(Not trace Topic)
-                String dataTopic =  entry.getKey();
-                flushData(entry.getValue(), dataTopic);
+                flushData(entry.getValue());
             }
         }
 
         /**
          * batch sending data actually
          */
-        private void flushData(List<TrackTraceTransferBean> transBeanList, String topic) {
+        private void flushData(List<TrackTraceTransferBean> transBeanList) {
             if (transBeanList.size() == 0) {
                 return;
             }
@@ -292,7 +299,7 @@ private void flushData(List<TrackTraceTransferBean> transBeanList, String topic)
          * @param data   the message track trace data in this batch
          */
         private void sendTraceDataByMQ(Set<String> keySet, final String data) {
-            String topic = TrackTraceConstants.TRACE_TOPIC;
+            String topic = traceTopicName;
             final Message message = new Message(topic, data.getBytes());
 
             //keyset of message track trace includes msgId of or original message
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
index 0c38e2203..c174f462d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
@@ -20,11 +20,10 @@
 import org.apache.rocketmq.client.hook.SendMessageHook;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.client.trace.core.common.TrackTraceBean;
-import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
 import org.apache.rocketmq.client.trace.core.common.TrackTraceContext;
 import org.apache.rocketmq.client.trace.core.common.TrackTraceType;
 import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
-import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
 import java.util.ArrayList;
 
 public class SendMessageTrackHookImpl implements SendMessageHook {
@@ -43,7 +42,7 @@ public String hookName() {
     @Override
     public void sendMessageBefore(SendMessageContext context) {
         //if it is message track trace data,then it doesn't recorded
-        if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) {
+        if (context == null || context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher) localDispatcher).getTraceTopicName())) {
             return;
         }
         //build the context content of TuxeTraceContext
@@ -67,7 +66,7 @@ public void sendMessageBefore(SendMessageContext context) {
     @Override
     public void sendMessageAfter(SendMessageContext context) {
         //if it is message track trace data,then it doesn't recorded
-        if (context == null || context.getMessage().getTopic().startsWith(TrackTraceConstants.TRACE_TOPIC)
+        if (context == null || context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher) localDispatcher).getTraceTopicName())
             || context.getMqTraceContext() == null) {
             return;
         }
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index 27c10dad5..08382dfe8 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -100,21 +100,23 @@
     private RebalancePushImpl rebalancePushImpl;
     private DefaultMQPushConsumer pushConsumer;
     private DefaultMQPushConsumer normalPushConsumer;
+    private DefaultMQPushConsumer customTraceTopicpushConsumer;
+
 
     private AsyncArrayDispatcher asyncArrayDispatcher;
     private MQClientInstance mQClientTraceFactory;
     @Mock
     private MQClientAPIImpl mQClientTraceAPIImpl;
     private DefaultMQProducer traceProducer;
-
+    private String customerTraceTopic = "rmq_track_trace_topic_12345";
 
     @Before
     public void init() throws Exception {
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
-        pushConsumer = new DefaultMQPushConsumer(consumerGroup,true);
+        pushConsumer = new DefaultMQPushConsumer(consumerGroup,true,"");
         consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
-        normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false);
-
+        normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false,"");
+        customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup,true,customerTraceTopic);
         pushConsumer.setNamesrvAddr("127.0.0.1:9876");
         pushConsumer.setPullInterval(60 * 1000);
 
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index c3757a8af..9460d6834 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -76,6 +76,7 @@
     private AsyncArrayDispatcher asyncArrayDispatcher;
 
     private DefaultMQProducer producer;
+    private DefaultMQProducer customTraceTopicproducer;
     private DefaultMQProducer traceProducer;
     private DefaultMQProducer normalProducer;
 
@@ -84,16 +85,22 @@
     private String producerGroupPrefix = "FooBar_PID";
     private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
     private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis();
-    
+    private String customerTraceTopic = "rmq_track_trace_topic_12345";
+
     @Before
     public void init() throws Exception {
 
-        normalProducer = new DefaultMQProducer(producerGroupTemp,false);
-        producer = new DefaultMQProducer(producerGroupTemp,true);
+        customTraceTopicproducer = new DefaultMQProducer(producerGroupTemp,false, customerTraceTopic);
+        normalProducer = new DefaultMQProducer(producerGroupTemp,false,"");
+        producer = new DefaultMQProducer(producerGroupTemp,true,"");
         producer.setNamesrvAddr("127.0.0.1:9876");
         normalProducer.setNamesrvAddr("127.0.0.1:9877");
+        customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
         message = new Message(topic, new byte[] {'a', 'b' ,'c'});
         asyncArrayDispatcher = (AsyncArrayDispatcher)producer.getTraceDispatcher();
+        asyncArrayDispatcher.setTraceTopicName(customerTraceTopic);
+        asyncArrayDispatcher.getHostProducer();
+        asyncArrayDispatcher.getHostConsumer();
         traceProducer = asyncArrayDispatcher.getTraceProducer();
 
         producer.start();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 77d492ecb..11c1fcb91 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -53,6 +53,8 @@
     private String messageStorePlugIn = "";
     @ImportantField
     private boolean autoTraceBrokerEnable = false;
+    @ImportantField
+    private String msgTrackTopicName = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
     /**
      * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
      * value is 1.
@@ -741,4 +743,12 @@ public boolean isAutoTraceBrokerEnable() {
     public void setAutoTraceBrokerEnable(boolean autoTraceBrokerEnable) {
         this.autoTraceBrokerEnable = autoTraceBrokerEnable;
     }
+
+    public String getMsgTrackTopicName() {
+        return msgTrackTopicName;
+    }
+
+    public void setMsgTrackTopicName(String msgTrackTopicName) {
+        this.msgTrackTopicName = msgTrackTopicName;
+    }
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
index 0fb9b3afb..8b10eef2a 100644
--- a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
@@ -27,4 +27,24 @@ public void testConsumerFallBehindThresholdOverflow() {
         long expect = 1024L * 1024 * 1024 * 16;
         assertThat(new BrokerConfig().getConsumerFallbehindThreshold()).isEqualTo(expect);
     }
+
+    @Test
+    public void testBrokerConfigAttribute() {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setNamesrvAddr("127.0.0.1:9876");
+        brokerConfig.setAutoCreateTopicEnable(false);
+        brokerConfig.setAutoTraceBrokerEnable(true);
+        brokerConfig.setBrokerName("broker-a");
+        brokerConfig.setBrokerId(0);
+        brokerConfig.setBrokerClusterName("DefaultCluster");
+        brokerConfig.setMsgTrackTopicName("RMQ_SYS_TRACK_TRACE_TOPIC4");
+        assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
+        assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
+        assertThat(brokerConfig.getMsgTrackTopicName()).isEqualTo("RMQ_SYS_TRACK_TRACE_TOPIC4");
+        assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
+        assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
+        assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
+        assertThat(brokerConfig.isAutoTraceBrokerEnable()).isEqualTo(true);
+
+    }
 }
\ No newline at end of file
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
index fb8e37fd2..5a513e414 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
@@ -26,7 +26,7 @@
 public class TraceProducer {
     public static void main(String[] args) throws MQClientException, InterruptedException {
 
-        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
+        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true, "");
         producer.start();
 
         for (int i = 0; i < 128; i++)
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
index b9710d4a7..e0e05a8cf 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
@@ -28,7 +28,8 @@
 
 public class TracePushConsumer {
     public static void main(String[] args) throws InterruptedException, MQClientException {
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
+        //here,we use the default message track trace topic name
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true, "");
         consumer.subscribe("TopicTest", "*");
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         //wrong time format 2017_0422_221800
diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
index 7ba580df9..d2adac53d 100644
--- a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
+++ b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
@@ -63,7 +63,7 @@ public MQProducer getInstance(String nameServerAddress, String group) throws MQC
             return p;
         }
 
-        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group,false);
+        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
         defaultMQProducer.setNamesrvAddr(nameServerAddress);
         MQProducer beforeProducer = null;
         beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
index 128939385..38904c0bf 100644
--- a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
+++ b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
@@ -39,7 +39,7 @@
 
     @Before
     public void mockLoggerAppender() throws Exception {
-        DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender",false));
+        DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender"));
         doAnswer(new Answer<Void>() {
             @Override
             public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
index a828e8ddf..ce739be59 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
@@ -46,7 +46,7 @@ public void create() {
     }
 
     public void create(boolean useTLS) {
-        consumer = new DefaultMQPushConsumer(consumerGroup,false);
+        consumer = new DefaultMQPushConsumer(consumerGroup);
         consumer.setInstanceName(RandomUtil.getStringByUUID());
         consumer.setNamesrvAddr(nsAddr);
         try {
diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
index abd1e3b64..66767cc9f 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
@@ -24,7 +24,7 @@
 public class ProducerFactory {
 
     public static DefaultMQProducer getRMQProducer(String ns) {
-        DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID(),false);
+        DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID());
         producer.setNamesrvAddr(ns);
         try {
             producer.start();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index 675fc2a81..6bb8caad4 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -213,7 +213,7 @@ public Options buildCommandlineOptions(Options options) {
     public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
         defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
-        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById",false);
+        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById");
         defaultMQProducer.setInstanceName(Long.toString(System.currentTimeMillis()));
 
         try {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
index 3debb3d15..9bf09ad41 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
@@ -65,7 +65,7 @@
     private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
         MixAll.TOOLS_CONSUMER_GROUP);
     private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(
-        MixAll.MONITOR_CONSUMER_GROUP,false);
+        MixAll.MONITOR_CONSUMER_GROUP);
 
     public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook rpcHook) {
         this.monitorConfig = monitorConfig;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services