You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/04/27 08:09:57 UTC

[rocketmq] branch develop updated: [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher` (#4180)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b6e65a805 [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher` (#4180)
b6e65a805 is described below

commit b6e65a8054a60cb5c62f5ade8a9e3927cf157185
Author: dugenkui <du...@foxmail.com>
AuthorDate: Wed Apr 27 16:09:39 2022 +0800

    [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher` (#4180)
    
    * Optimized the performance of sending traceMessage
    
    * Optimized the performance of sending traceMessage
    
    * Optimized the performance of sending traceMessage
    
    * Optimized the performance of sending traceMessage
---
 .../client/trace/AsyncTraceDispatcher.java         | 203 ++++++++++++---------
 1 file changed, 114 insertions(+), 89 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 7ff8bd77e..19c276238 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -30,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -59,12 +57,15 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private final int queueSize;
     private final int batchSize;
     private final int maxMsgSize;
+    private final long pollingTimeMil;
+    private final long waitTimeThresholdMil;
     private final DefaultMQProducer traceProducer;
     private final ThreadPoolExecutor traceExecutor;
     // The last discard number of log
     private AtomicLong discardCount;
     private Thread worker;
     private final ArrayBlockingQueue<TraceContext> traceContextQueue;
+    private final HashMap<String, TraceDataSegment> taskQueueByTopic;
     private ArrayBlockingQueue<Runnable> appenderQueue;
     private volatile Thread shutDownHook;
     private volatile boolean stopped = false;
@@ -72,9 +73,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private DefaultMQPushConsumerImpl hostConsumer;
     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
     private String dispatcherId = UUID.randomUUID().toString();
-    private String traceTopicName;
+    private volatile String traceTopicName;
     private AtomicBoolean isStarted = new AtomicBoolean(false);
-    private AccessChannel accessChannel = AccessChannel.LOCAL;
+    private volatile AccessChannel accessChannel = AccessChannel.LOCAL;
     private String group;
     private Type type;
 
@@ -83,8 +84,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         this.queueSize = 2048;
         this.batchSize = 100;
         this.maxMsgSize = 128000;
+        this.pollingTimeMil = 100;
+        this.waitTimeThresholdMil = 500;
         this.discardCount = new AtomicLong(0L);
         this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
+        this.taskQueueByTopic = new HashMap();
         this.group = group;
         this.type = type;
 
@@ -243,113 +247,137 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         @Override
         public void run() {
             while (!stopped) {
-                List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
                 synchronized (traceContextQueue) {
-                    for (int i = 0; i < batchSize; i++) {
-                        TraceContext context = null;
+                    long endTime = System.currentTimeMillis() + pollingTimeMil;
+                    while (System.currentTimeMillis() < endTime) {
                         try {
-                            //get trace data element from blocking Queue - traceContextQueue
-                            context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
-                        } catch (InterruptedException e) {
-                        }
-                        if (context != null) {
-                            contexts.add(context);
-                        } else {
-                            break;
+                            TraceContext traceContext = traceContextQueue.poll(
+                                    endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
+                            );
+
+                            if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {
+                                // get the topic which the trace message will send to
+                                String traceTopicName = this.getTraceTopicName(traceContext.getRegionId());
+
+                                // get the traceDataSegment which will save this trace message, create if null
+                                TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName);
+                                if (traceDataSegment == null) {
+                                    traceDataSegment = new TraceDataSegment(traceTopicName, traceContext.getRegionId());
+                                    taskQueueByTopic.put(traceTopicName, traceDataSegment);
+                                }
+
+                                // encode traceContext and save it into traceDataSegment
+                                // NOTE if data size in traceDataSegment more than maxMsgSize,
+                                //  a AsyncDataSendTask will be created and submitted
+                                TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext);
+                                traceDataSegment.addTraceTransferBean(traceTransferBean);
+                            }
+                        } catch (InterruptedException ignore) {
+                            log.debug("traceContextQueue#poll exception");
                         }
                     }
-                    if (contexts.size() > 0) {
-                        AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
-                        traceExecutor.submit(request);
-                    } else if (AsyncTraceDispatcher.this.stopped) {
+
+                    // NOTE send the data in traceDataSegment which the first TraceTransferBean
+                    //  is longer than waitTimeThreshold
+                    sendDataByTimeThreshold();
+
+                    if (AsyncTraceDispatcher.this.stopped) {
                         this.stopped = true;
                     }
                 }
             }
 
         }
-    }
 
-    class AsyncAppenderRequest implements Runnable {
-        List<TraceContext> contextList;
+        private void sendDataByTimeThreshold() {
+            long now = System.currentTimeMillis();
+            for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
+                if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
+                    taskInfo.sendAllData();
+                }
+            }
+        }
 
-        public AsyncAppenderRequest(final List<TraceContext> contextList) {
-            if (contextList != null) {
-                this.contextList = contextList;
-            } else {
-                this.contextList = new ArrayList<TraceContext>(1);
+        private String getTraceTopicName(String regionId) {
+            AccessChannel accessChannel = AsyncTraceDispatcher.this.getAccessChannel();
+            if (AccessChannel.CLOUD == accessChannel) {
+                return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
             }
+
+            return AsyncTraceDispatcher.this.getTraceTopicName();
         }
+    }
 
-        @Override
-        public void run() {
-            sendTraceData(contextList);
+    class TraceDataSegment {
+        private long firstBeanAddTime;
+        private int currentMsgSize;
+        private final String traceTopicName;
+        private final String regionId;
+        private final List<TraceTransferBean> traceTransferBeanList = new ArrayList();
+
+        TraceDataSegment(String traceTopicName, String regionId) {
+            this.traceTopicName = traceTopicName;
+            this.regionId = regionId;
         }
 
-        public void sendTraceData(List<TraceContext> contextList) {
-            Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
-            for (TraceContext context : contextList) {
-                if (context.getTraceBeans().isEmpty()) {
-                    continue;
-                }
-                // Topic value corresponding to original message entity content
-                String topic = context.getTraceBeans().get(0).getTopic();
-                String regionId = context.getRegionId();
-                // Use  original message entity's topic as key
-                String key = topic;
-                if (!StringUtils.isBlank(regionId)) {
-                    key = key + TraceConstants.CONTENT_SPLITOR + regionId;
-                }
-                List<TraceTransferBean> transBeanList = transBeanMap.get(key);
-                if (transBeanList == null) {
-                    transBeanList = new ArrayList<TraceTransferBean>();
-                    transBeanMap.put(key, transBeanList);
-                }
-                TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
-                transBeanList.add(traceData);
-            }
-            for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
-                String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
-                String dataTopic = entry.getKey();
-                String regionId = null;
-                if (key.length > 1) {
-                    dataTopic = key[0];
-                    regionId = key[1];
-                }
-                flushData(entry.getValue(), dataTopic, regionId);
+        public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
+            initFirstBeanAddTime();
+            this.traceTransferBeanList.add(traceTransferBean);
+            this.currentMsgSize += traceTransferBean.getTransData().length();
+            if (currentMsgSize >= traceProducer.getMaxMessageSize()) {
+                List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
+                AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
+                traceExecutor.submit(asyncDataSendTask);
+
+                this.clear();
             }
         }
 
-        /**
-         * Batch sending data actually
-         */
-        private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
-            if (transBeanList.size() == 0) {
+        public void sendAllData() {
+            if (this.traceTransferBeanList.isEmpty()) {
                 return;
             }
-            // Temporary buffer
+            List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
+            AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
+            traceExecutor.submit(asyncDataSendTask);
+
+            this.clear();
+        }
+
+        private void initFirstBeanAddTime() {
+            if (firstBeanAddTime == 0) {
+                firstBeanAddTime = System.currentTimeMillis();
+            }
+        }
+
+        private void clear() {
+            this.firstBeanAddTime = 0;
+            this.currentMsgSize = 0;
+            this.traceTransferBeanList.clear();
+        }
+    }
+
+
+    class AsyncDataSendTask implements Runnable {
+        private final String traceTopicName;
+        private final String regionId;
+        private final List<TraceTransferBean> traceTransferBeanList;
+
+        public AsyncDataSendTask(String traceTopicName, String regionId, List<TraceTransferBean> traceTransferBeanList) {
+            this.traceTopicName = traceTopicName;
+            this.regionId = regionId;
+            this.traceTransferBeanList = traceTransferBeanList;
+        }
+
+        @Override
+        public void run() {
             StringBuilder buffer = new StringBuilder(1024);
-            int count = 0;
             Set<String> keySet = new HashSet<String>();
-
-            for (TraceTransferBean bean : transBeanList) {
-                // Keyset of message trace includes msgId of or original message
+            for (TraceTransferBean bean : traceTransferBeanList) {
                 keySet.addAll(bean.getTransKey());
                 buffer.append(bean.getTransData());
-                count++;
-                // Ensure that the size of the package should not exceed the upper limit.
-                if (buffer.length() >= traceProducer.getMaxMessageSize()) {
-                    sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
-                    // Clear temporary buffer after finishing
-                    buffer.delete(0, buffer.length());
-                    keySet.clear();
-                    count = 0;
-                }
-            }
-            if (count > 0) {
-                sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
             }
-            transBeanList.clear();
+            sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName);
         }
 
         /**
@@ -357,12 +385,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
          *
          * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
          * @param data   the message trace data in this batch
+         * @param traceTopic the topic which message trace data will send to
          */
-        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
-            String traceTopic = traceTopicName;
-            if (AccessChannel.CLOUD == accessChannel) {
-                traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
-            }
+        private void sendTraceDataByMQ(Set<String> keySet, final String data, String traceTopic) {
             final Message message = new Message(traceTopic, data.getBytes());
             // Keyset of message trace includes msgId of or original message
             message.setKeys(keySet);