You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/05/13 08:39:55 UTC

[rocketmq] branch develop updated: [ISSUE #1200] Polish default message trace topic trace (#1201)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e0414c0  [ISSUE #1200] Polish default message trace topic trace (#1201)
e0414c0 is described below

commit e0414c005034471d9b461b478a581a5027c793ec
Author: Heng Du <du...@apache.org>
AuthorDate: Mon May 13 16:39:46 2019 +0800

    [ISSUE #1200] Polish default message trace topic trace (#1201)
    
    Polish message trace default trace topic implementation
---
 .../client/trace/AsyncTraceDispatcher.java         | 74 +++++++++++++---------
 .../rocketmq/client/trace/TraceConstants.java      |  3 +
 2 files changed, 46 insertions(+), 31 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 87a795e..0aaadb1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -16,6 +16,20 @@
  */
 package org.apache.rocketmq.client.trace;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -34,21 +48,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 
-import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.UUID;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.HashSet;
-
-
 import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
 
 public class AsyncTraceDispatcher implements TraceDispatcher {
@@ -73,7 +72,6 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private String traceTopicName;
     private AtomicBoolean isStarted = new AtomicBoolean(false);
 
-
     public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
         // queueSize is greater than or equal to the n power of 2 of value
         this.queueSize = 2048;
@@ -88,12 +86,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
             this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
         }
         this.traceExecuter = new ThreadPoolExecutor(//
-                10, //
-                20, //
-                1000 * 60, //
-                TimeUnit.MILLISECONDS, //
-                this.appenderQueue, //
-                new ThreadFactoryImpl("MQTraceSendThread_"));
+            10, //
+            20, //
+            1000 * 60, //
+            TimeUnit.MILLISECONDS, //
+            this.appenderQueue, //
+            new ThreadFactoryImpl("MQTraceSendThread_"));
         traceProducer = getAndCreateTraceProducer(rpcHook);
     }
 
@@ -266,8 +264,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
                 }
                 // Topic value corresponding to original message entity content
                 String topic = context.getTraceBeans().get(0).getTopic();
+                String regionId = context.getRegionId();
                 // Use  original message entity's topic as key
                 String key = topic;
+                if (!StringUtils.isBlank(regionId)) {
+                    key = key + TraceConstants.CONTENT_SPLITOR + regionId;
+                }
                 List<TraceTransferBean> transBeanList = transBeanMap.get(key);
                 if (transBeanList == null) {
                     transBeanList = new ArrayList<TraceTransferBean>();
@@ -277,14 +279,21 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
                 transBeanList.add(traceData);
             }
             for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
-                flushData(entry.getValue());
+                String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
+                String dataTopic = entry.getKey();
+                String regionId = null;
+                if (key.length > 1) {
+                    dataTopic = key[0];
+                    regionId = key[1];
+                }
+                flushData(entry.getValue(), dataTopic, regionId);
             }
         }
 
         /**
          * Batch sending data actually
          */
-        private void flushData(List<TraceTransferBean> transBeanList) {
+        private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
             if (transBeanList.size() == 0) {
                 return;
             }
@@ -300,7 +309,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
                 count++;
                 // Ensure that the size of the package should not exceed the upper limit.
                 if (buffer.length() >= traceProducer.getMaxMessageSize()) {
-                    sendTraceDataByMQ(keySet, buffer.toString());
+                    sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
                     // Clear temporary buffer after finishing
                     buffer.delete(0, buffer.length());
                     keySet.clear();
@@ -308,7 +317,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
                 }
             }
             if (count > 0) {
-                sendTraceDataByMQ(keySet, buffer.toString());
+                sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
             }
             transBeanList.clear();
         }
@@ -317,16 +326,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
          * Send message trace data
          *
          * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
-         * @param data   the message trace data in this batch
+         * @param data the message trace data in this batch
          */
-        private void sendTraceDataByMQ(Set<String> keySet, final String data) {
-            String topic = traceTopicName;
-            final Message message = new Message(topic, data.getBytes());
+        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
+            String traceTopic = traceTopicName;
+            if (StringUtils.isNotEmpty(regionId) && MixAll.RMQ_SYS_TRACE_TOPIC.equals(traceTopic)) {
+                traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
+            }
+            final Message message = new Message(traceTopic, data.getBytes());
 
             // Keyset of message trace includes msgId of or original message
             message.setKeys(keySet);
             try {
-                Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
+                Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);
                 SendCallback callback = new SendCallback() {
                     @Override
                     public void onSuccess(SendResult sendResult) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
index b9fd877..e61ea9d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
@@ -16,10 +16,13 @@
  */
 package org.apache.rocketmq.client.trace;
 
+import org.apache.rocketmq.common.MixAll;
+
 public class TraceConstants {
 
     public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
     public static final char CONTENT_SPLITOR = (char) 1;
     public static final char FIELD_SPLITOR = (char) 2;
     public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
+    public static final String TRACE_TOPIC_PREFIX = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
 }