You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/04/17 17:34:46 UTC

[GitHub] [rocketmq] dugenkui03 opened a new pull request, #4180: [ISSUE #4099]Optimized the performance of sending traceMessage

dugenkui03 opened a new pull request, #4180:
URL: https://github.com/apache/rocketmq/pull/4180

   ## What is the purpose of the change
   
   Details in https://github.com/apache/rocketmq/pull/4099#issuecomment-1099351837
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] coveralls commented on pull request #4180: [ISSUE #4099]Optimized the performance of sending traceMessage

Posted by GitBox <gi...@apache.org>.
coveralls commented on PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180#issuecomment-1100924383

   
   [![Coverage Status](https://coveralls.io/builds/48334800/badge)](https://coveralls.io/builds/48334800)
   
   Coverage increased (+0.04%) to 52.017% when pulling **64488bd545913c6c35d44e117dacbc3599c19ba0 on dugenkui03:patch-007** into **50e314eb8ada23283dcdbb261766183c90fd435c on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] dongeforever commented on a diff in pull request #4180: [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher`

Posted by GitBox <gi...@apache.org>.
dongeforever commented on code in PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180#discussion_r853767970


##########
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java:
##########
@@ -243,113 +247,133 @@ class AsyncRunnable implements Runnable {
         @Override
         public void run() {
             while (!stopped) {
-                List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
                 synchronized (traceContextQueue) {
-                    for (int i = 0; i < batchSize; i++) {
-                        TraceContext context = null;
+                    long endTime = System.currentTimeMillis() + pollingTimeMil;
+                    while (System.currentTimeMillis() < endTime) {
                         try {
-                            //get trace data element from blocking Queue - traceContextQueue
-                            context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
-                        } catch (InterruptedException e) {
-                        }
-                        if (context != null) {
-                            contexts.add(context);
-                        } else {
-                            break;
+                            TraceContext traceContext = traceContextQueue.poll(
+                                    endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
+                            );
+
+                            if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {
+                                // get the topic which the trace message will send to
+                                String traceTopicName = this.getTraceTopicName(traceContext.getRegionId());
+
+                                // get the traceDataSegment which will save this trace message, create if null
+                                TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName);
+                                if (traceDataSegment == null) {
+                                    traceDataSegment = new TraceDataSegment(traceContext.getRegionId());
+                                    taskQueueByTopic.put(traceTopicName, traceDataSegment);
+                                }
+
+                                // encode traceContext and save it into traceDataSegment
+                                // NOTE if data size in traceDataSegment more than maxMsgSize,
+                                //  a AsyncDataSendTask will be created and submitted
+                                TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext);
+                                traceDataSegment.addTraceTransferBean(traceTransferBean);
+                            }
+                        } catch (InterruptedException ignore) {
+                            log.debug("traceContextQueue#poll exception");
                         }
                     }
-                    if (contexts.size() > 0) {
-                        AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
-                        traceExecutor.submit(request);
-                    } else if (AsyncTraceDispatcher.this.stopped) {
+
+                    // NOTE send the data in traceDataSegment which the first TraceTransferBean
+                    //  is longer than waitTimeThreshold
+                    sendDataByTimeThreshold();
+
+                    if (AsyncTraceDispatcher.this.stopped) {
                         this.stopped = true;
                     }
                 }
             }
 
         }
-    }
 
-    class AsyncAppenderRequest implements Runnable {
-        List<TraceContext> contextList;
+        private void sendDataByTimeThreshold() {
+            long now = System.currentTimeMillis();
+            for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
+                if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
+                    taskInfo.sendAllData();
+                }
+            }
+        }
 
-        public AsyncAppenderRequest(final List<TraceContext> contextList) {
-            if (contextList != null) {
-                this.contextList = contextList;
-            } else {
-                this.contextList = new ArrayList<TraceContext>(1);
+        private String getTraceTopicName(String regionId) {
+            AccessChannel accessChannel = AsyncTraceDispatcher.this.getAccessChannel();
+            if (AccessChannel.CLOUD == accessChannel) {
+                return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
             }
+
+            return AsyncTraceDispatcher.this.getTraceTopicName();
         }
+    }
 
-        @Override
-        public void run() {
-            sendTraceData(contextList);
+    class TraceDataSegment {
+        public long firstBeanAddTime;
+        public int currentMsgSize;
+        public final String regionId;
+        public final List<TraceTransferBean> traceTransferBeanList = new ArrayList();
+
+        TraceDataSegment(String regionId) {
+            this.regionId = regionId;
         }
 
-        public void sendTraceData(List<TraceContext> contextList) {
-            Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
-            for (TraceContext context : contextList) {
-                if (context.getTraceBeans().isEmpty()) {
-                    continue;
-                }
-                // Topic value corresponding to original message entity content
-                String topic = context.getTraceBeans().get(0).getTopic();
-                String regionId = context.getRegionId();
-                // Use  original message entity's topic as key
-                String key = topic;
-                if (!StringUtils.isBlank(regionId)) {
-                    key = key + TraceConstants.CONTENT_SPLITOR + regionId;
-                }
-                List<TraceTransferBean> transBeanList = transBeanMap.get(key);
-                if (transBeanList == null) {
-                    transBeanList = new ArrayList<TraceTransferBean>();
-                    transBeanMap.put(key, transBeanList);
-                }
-                TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
-                transBeanList.add(traceData);
-            }
-            for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
-                String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
-                String dataTopic = entry.getKey();
-                String regionId = null;
-                if (key.length > 1) {
-                    dataTopic = key[0];
-                    regionId = key[1];
-                }
-                flushData(entry.getValue(), dataTopic, regionId);
+        public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
+            initFirstBeanAddTime();
+            this.traceTransferBeanList.add(traceTransferBean);
+            this.currentMsgSize += traceTransferBean.getTransData().length();
+            if (currentMsgSize >= maxMsgSize) {
+                List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
+                AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(regionId, dataToSend);
+                traceExecutor.submit(asyncDataSendTask);
+
+                this.clear();
             }
         }
 
-        /**
-         * Batch sending data actually
-         */
-        private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
-            if (transBeanList.size() == 0) {
+        public void sendAllData() {
+            if (this.traceTransferBeanList.isEmpty()) {
                 return;
             }
-            // Temporary buffer
+            List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
+            AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(regionId, dataToSend);
+            traceExecutor.submit(asyncDataSendTask);
+
+            this.clear();
+        }
+
+        private void initFirstBeanAddTime() {
+            if (firstBeanAddTime == 0) {
+                firstBeanAddTime = System.currentTimeMillis();
+            }
+        }
+
+        private void clear() {
+            this.firstBeanAddTime = 0;
+            this.currentMsgSize = 0;
+            this.traceTransferBeanList.clear();
+        }
+    }
+
+
+    class AsyncDataSendTask implements Runnable {
+        public final String regionId;
+        public final List<TraceTransferBean> traceTransferBeanList;
+
+        public AsyncDataSendTask(String regionId, List<TraceTransferBean> traceTransferBeanList) {
+            this.regionId = regionId;
+            this.traceTransferBeanList = traceTransferBeanList;
+        }
+
+        @Override
+        public void run() {
             StringBuilder buffer = new StringBuilder(1024);

Review Comment:
   It is a good habit to catch the unchecked exception in runnable, if  we do not get the future result, for the ThreadPool will swallow the exception, and we get no information if something unexpected happens.



##########
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java:
##########
@@ -358,7 +382,7 @@ private void flushData(List<TraceTransferBean> transBeanList, String dataTopic,
          * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
          * @param data   the message trace data in this batch
          */
-        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
+        private void sendTraceDataByMQ(Set<String> keySet, final String data, String regionId) {
             String traceTopic = traceTopicName;

Review Comment:
   Since the topic is generated by getTraceTopicName during the previous process.
   Here is better to just use the topic generated before, and do not generate it again.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] dugenkui03 commented on a diff in pull request #4180: [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher`

Posted by GitBox <gi...@apache.org>.
dugenkui03 commented on code in PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180#discussion_r857141825


##########
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java:
##########
@@ -243,113 +247,133 @@ class AsyncRunnable implements Runnable {
         @Override
         public void run() {
             while (!stopped) {
-                List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
                 synchronized (traceContextQueue) {
-                    for (int i = 0; i < batchSize; i++) {
-                        TraceContext context = null;
+                    long endTime = System.currentTimeMillis() + pollingTimeMil;
+                    while (System.currentTimeMillis() < endTime) {
                         try {
-                            //get trace data element from blocking Queue - traceContextQueue
-                            context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
-                        } catch (InterruptedException e) {
-                        }
-                        if (context != null) {
-                            contexts.add(context);
-                        } else {
-                            break;
+                            TraceContext traceContext = traceContextQueue.poll(
+                                    endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
+                            );
+
+                            if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {
+                                // get the topic which the trace message will send to
+                                String traceTopicName = this.getTraceTopicName(traceContext.getRegionId());
+
+                                // get the traceDataSegment which will save this trace message, create if null
+                                TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName);
+                                if (traceDataSegment == null) {
+                                    traceDataSegment = new TraceDataSegment(traceContext.getRegionId());
+                                    taskQueueByTopic.put(traceTopicName, traceDataSegment);
+                                }
+
+                                // encode traceContext and save it into traceDataSegment
+                                // NOTE if data size in traceDataSegment more than maxMsgSize,
+                                //  a AsyncDataSendTask will be created and submitted
+                                TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext);
+                                traceDataSegment.addTraceTransferBean(traceTransferBean);
+                            }
+                        } catch (InterruptedException ignore) {
+                            log.debug("traceContextQueue#poll exception");
                         }
                     }
-                    if (contexts.size() > 0) {
-                        AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
-                        traceExecutor.submit(request);
-                    } else if (AsyncTraceDispatcher.this.stopped) {
+
+                    // NOTE send the data in traceDataSegment which the first TraceTransferBean
+                    //  is longer than waitTimeThreshold
+                    sendDataByTimeThreshold();
+
+                    if (AsyncTraceDispatcher.this.stopped) {
                         this.stopped = true;
                     }
                 }
             }
 
         }
-    }
 
-    class AsyncAppenderRequest implements Runnable {
-        List<TraceContext> contextList;
+        private void sendDataByTimeThreshold() {
+            long now = System.currentTimeMillis();
+            for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
+                if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
+                    taskInfo.sendAllData();
+                }
+            }
+        }
 
-        public AsyncAppenderRequest(final List<TraceContext> contextList) {
-            if (contextList != null) {
-                this.contextList = contextList;
-            } else {
-                this.contextList = new ArrayList<TraceContext>(1);
+        private String getTraceTopicName(String regionId) {
+            AccessChannel accessChannel = AsyncTraceDispatcher.this.getAccessChannel();
+            if (AccessChannel.CLOUD == accessChannel) {
+                return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
             }
+
+            return AsyncTraceDispatcher.this.getTraceTopicName();
         }
+    }
 
-        @Override
-        public void run() {
-            sendTraceData(contextList);
+    class TraceDataSegment {
+        public long firstBeanAddTime;
+        public int currentMsgSize;
+        public final String regionId;
+        public final List<TraceTransferBean> traceTransferBeanList = new ArrayList();
+
+        TraceDataSegment(String regionId) {
+            this.regionId = regionId;
         }
 
-        public void sendTraceData(List<TraceContext> contextList) {
-            Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
-            for (TraceContext context : contextList) {
-                if (context.getTraceBeans().isEmpty()) {
-                    continue;
-                }
-                // Topic value corresponding to original message entity content
-                String topic = context.getTraceBeans().get(0).getTopic();
-                String regionId = context.getRegionId();
-                // Use  original message entity's topic as key
-                String key = topic;
-                if (!StringUtils.isBlank(regionId)) {
-                    key = key + TraceConstants.CONTENT_SPLITOR + regionId;
-                }
-                List<TraceTransferBean> transBeanList = transBeanMap.get(key);
-                if (transBeanList == null) {
-                    transBeanList = new ArrayList<TraceTransferBean>();
-                    transBeanMap.put(key, transBeanList);
-                }
-                TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
-                transBeanList.add(traceData);
-            }
-            for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
-                String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
-                String dataTopic = entry.getKey();
-                String regionId = null;
-                if (key.length > 1) {
-                    dataTopic = key[0];
-                    regionId = key[1];
-                }
-                flushData(entry.getValue(), dataTopic, regionId);
+        public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
+            initFirstBeanAddTime();
+            this.traceTransferBeanList.add(traceTransferBean);
+            this.currentMsgSize += traceTransferBean.getTransData().length();
+            if (currentMsgSize >= maxMsgSize) {
+                List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
+                AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(regionId, dataToSend);
+                traceExecutor.submit(asyncDataSendTask);
+
+                this.clear();
             }
         }
 
-        /**
-         * Batch sending data actually
-         */
-        private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
-            if (transBeanList.size() == 0) {
+        public void sendAllData() {
+            if (this.traceTransferBeanList.isEmpty()) {
                 return;
             }
-            // Temporary buffer
+            List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
+            AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(regionId, dataToSend);
+            traceExecutor.submit(asyncDataSendTask);
+
+            this.clear();
+        }
+
+        private void initFirstBeanAddTime() {
+            if (firstBeanAddTime == 0) {
+                firstBeanAddTime = System.currentTimeMillis();
+            }
+        }
+
+        private void clear() {
+            this.firstBeanAddTime = 0;
+            this.currentMsgSize = 0;
+            this.traceTransferBeanList.clear();
+        }
+    }
+
+
+    class AsyncDataSendTask implements Runnable {
+        public final String regionId;
+        public final List<TraceTransferBean> traceTransferBeanList;
+
+        public AsyncDataSendTask(String regionId, List<TraceTransferBean> traceTransferBeanList) {
+            this.regionId = regionId;
+            this.traceTransferBeanList = traceTransferBeanList;
+        }
+
+        @Override
+        public void run() {
             StringBuilder buffer = new StringBuilder(1024);

Review Comment:
   There is a `try catch` in the following invoked method `sendTraceDataByMQ()`, is this enough?
   
   <img width="760" alt="image" src="https://user-images.githubusercontent.com/18216266/164983447-e3739a8d-4741-4b53-bd8d-44a555127475.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #4180: [ISSUE #4099]Optimized the performance of sending traceMessage

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180#issuecomment-1100924264

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4180?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#4180](https://codecov.io/gh/apache/rocketmq/pull/4180?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (64488bd) into [develop](https://codecov.io/gh/apache/rocketmq/commit/fd554ab12072225325c957ff6bdf492fc67821af?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (fd554ab) will **increase** coverage by `0.06%`.
   > The diff coverage is `83.33%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #4180      +/-   ##
   =============================================
   + Coverage      47.92%   47.98%   +0.06%     
   - Complexity      5002     5011       +9     
   =============================================
     Files            634      635       +1     
     Lines          42529    42484      -45     
     Branches        5573     5559      -14     
   =============================================
   + Hits           20381    20388       +7     
   + Misses         19647    19608      -39     
   + Partials        2501     2488      -13     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/4180?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...he/rocketmq/client/trace/AsyncTraceDispatcher.java](https://codecov.io/gh/apache/rocketmq/pull/4180/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvdHJhY2UvQXN5bmNUcmFjZURpc3BhdGNoZXIuamF2YQ==) | `80.09% <83.33%> (+0.39%)` | :arrow_up: |
   | [...in/java/org/apache/rocketmq/test/util/MQAdmin.java](https://codecov.io/gh/apache/rocketmq/pull/4180/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dGVzdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvdGVzdC91dGlsL01RQWRtaW4uamF2YQ==) | `38.88% <0.00%> (-5.56%)` | :arrow_down: |
   | [...apache/rocketmq/remoting/netty/ResponseFuture.java](https://codecov.io/gh/apache/rocketmq/pull/4180/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L1Jlc3BvbnNlRnV0dXJlLmphdmE=) | `85.00% <0.00%> (-5.00%)` | :arrow_down: |
   | [...ketmq/common/protocol/body/ConsumerConnection.java](https://codecov.io/gh/apache/rocketmq/pull/4180/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvYm9keS9Db25zdW1lckNvbm5lY3Rpb24uamF2YQ==) | `95.83% <0.00%> (-4.17%)` | :arrow_down: |
   | [...rocketmq/remoting/netty/NettyRemotingAbstract.java](https://codecov.io/gh/apache/rocketmq/pull/4180/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5UmVtb3RpbmdBYnN0cmFjdC5qYXZh) | `46.88% <0.00%> (-4.03%)` | :arrow_down: |
   | [...ava/org/apache/rocketmq/test/util/VerifyUtils.java](https://codecov.io/gh/apache/rocketmq/pull/4180/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dGVzdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvdGVzdC91dGlsL1ZlcmlmeVV0aWxzLmphdmE=) | `46.26% <0.00%> (-2.99%)` | :arrow_down: |
   | [...ava/org/apache/rocketmq/filter/util/BitsArray.java](https://codecov.io/gh/apache/rocketmq/pull/4180/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZmlsdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9maWx0ZXIvdXRpbC9CaXRzQXJyYXkuamF2YQ==) | `59.82% <0.00%> (-2.57%)` | :arrow_down: |
   | [...va/org/apache/rocketmq/logging/inner/Appender.java](https://codecov.io/gh/apache/rocketmq/pull/4180/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bG9nZ2luZy9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvbG9nZ2luZy9pbm5lci9BcHBlbmRlci5qYXZh) | `34.83% <0.00%> (-2.25%)` | :arrow_down: |
   | [...rg/apache/rocketmq/remoting/netty/NettyLogger.java](https://codecov.io/gh/apache/rocketmq/pull/4180/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5TG9nZ2VyLmphdmE=) | `14.96% <0.00%> (-1.37%)` | :arrow_down: |
   | ... and [31 more](https://codecov.io/gh/apache/rocketmq/pull/4180/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/4180?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4180?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [fd554ab...64488bd](https://codecov.io/gh/apache/rocketmq/pull/4180?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] duhenglucky merged pull request #4180: [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher`

Posted by GitBox <gi...@apache.org>.
duhenglucky merged PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] dugenkui03 commented on pull request #4180: [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher`

Posted by GitBox <gi...@apache.org>.
dugenkui03 commented on PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180#issuecomment-1108018691

   @dongeforever Thanks for your review, this pr is updated, please help to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] dongeforever commented on a diff in pull request #4180: [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher`

Posted by GitBox <gi...@apache.org>.
dongeforever commented on code in PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180#discussion_r859501837


##########
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java:
##########
@@ -243,113 +247,133 @@ class AsyncRunnable implements Runnable {
         @Override
         public void run() {
             while (!stopped) {
-                List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
                 synchronized (traceContextQueue) {
-                    for (int i = 0; i < batchSize; i++) {
-                        TraceContext context = null;
+                    long endTime = System.currentTimeMillis() + pollingTimeMil;
+                    while (System.currentTimeMillis() < endTime) {
                         try {
-                            //get trace data element from blocking Queue - traceContextQueue
-                            context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
-                        } catch (InterruptedException e) {
-                        }
-                        if (context != null) {
-                            contexts.add(context);
-                        } else {
-                            break;
+                            TraceContext traceContext = traceContextQueue.poll(
+                                    endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
+                            );
+
+                            if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {
+                                // get the topic which the trace message will send to
+                                String traceTopicName = this.getTraceTopicName(traceContext.getRegionId());
+
+                                // get the traceDataSegment which will save this trace message, create if null
+                                TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName);
+                                if (traceDataSegment == null) {
+                                    traceDataSegment = new TraceDataSegment(traceContext.getRegionId());
+                                    taskQueueByTopic.put(traceTopicName, traceDataSegment);
+                                }
+
+                                // encode traceContext and save it into traceDataSegment
+                                // NOTE if data size in traceDataSegment more than maxMsgSize,
+                                //  a AsyncDataSendTask will be created and submitted
+                                TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext);
+                                traceDataSegment.addTraceTransferBean(traceTransferBean);
+                            }
+                        } catch (InterruptedException ignore) {
+                            log.debug("traceContextQueue#poll exception");
                         }
                     }
-                    if (contexts.size() > 0) {
-                        AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
-                        traceExecutor.submit(request);
-                    } else if (AsyncTraceDispatcher.this.stopped) {
+
+                    // NOTE send the data in traceDataSegment which the first TraceTransferBean
+                    //  is longer than waitTimeThreshold
+                    sendDataByTimeThreshold();
+
+                    if (AsyncTraceDispatcher.this.stopped) {
                         this.stopped = true;
                     }
                 }
             }
 
         }
-    }
 
-    class AsyncAppenderRequest implements Runnable {
-        List<TraceContext> contextList;
+        private void sendDataByTimeThreshold() {
+            long now = System.currentTimeMillis();
+            for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
+                if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
+                    taskInfo.sendAllData();
+                }
+            }
+        }
 
-        public AsyncAppenderRequest(final List<TraceContext> contextList) {
-            if (contextList != null) {
-                this.contextList = contextList;
-            } else {
-                this.contextList = new ArrayList<TraceContext>(1);
+        private String getTraceTopicName(String regionId) {
+            AccessChannel accessChannel = AsyncTraceDispatcher.this.getAccessChannel();
+            if (AccessChannel.CLOUD == accessChannel) {
+                return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
             }
+
+            return AsyncTraceDispatcher.this.getTraceTopicName();
         }
+    }
 
-        @Override
-        public void run() {
-            sendTraceData(contextList);
+    class TraceDataSegment {
+        public long firstBeanAddTime;
+        public int currentMsgSize;
+        public final String regionId;
+        public final List<TraceTransferBean> traceTransferBeanList = new ArrayList();
+
+        TraceDataSegment(String regionId) {
+            this.regionId = regionId;
         }
 
-        public void sendTraceData(List<TraceContext> contextList) {
-            Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
-            for (TraceContext context : contextList) {
-                if (context.getTraceBeans().isEmpty()) {
-                    continue;
-                }
-                // Topic value corresponding to original message entity content
-                String topic = context.getTraceBeans().get(0).getTopic();
-                String regionId = context.getRegionId();
-                // Use  original message entity's topic as key
-                String key = topic;
-                if (!StringUtils.isBlank(regionId)) {
-                    key = key + TraceConstants.CONTENT_SPLITOR + regionId;
-                }
-                List<TraceTransferBean> transBeanList = transBeanMap.get(key);
-                if (transBeanList == null) {
-                    transBeanList = new ArrayList<TraceTransferBean>();
-                    transBeanMap.put(key, transBeanList);
-                }
-                TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
-                transBeanList.add(traceData);
-            }
-            for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
-                String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
-                String dataTopic = entry.getKey();
-                String regionId = null;
-                if (key.length > 1) {
-                    dataTopic = key[0];
-                    regionId = key[1];
-                }
-                flushData(entry.getValue(), dataTopic, regionId);
+        public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
+            initFirstBeanAddTime();
+            this.traceTransferBeanList.add(traceTransferBean);
+            this.currentMsgSize += traceTransferBean.getTransData().length();
+            if (currentMsgSize >= maxMsgSize) {
+                List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
+                AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(regionId, dataToSend);
+                traceExecutor.submit(asyncDataSendTask);
+
+                this.clear();
             }
         }
 
-        /**
-         * Batch sending data actually
-         */
-        private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
-            if (transBeanList.size() == 0) {
+        public void sendAllData() {
+            if (this.traceTransferBeanList.isEmpty()) {
                 return;
             }
-            // Temporary buffer
+            List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
+            AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(regionId, dataToSend);
+            traceExecutor.submit(asyncDataSendTask);
+
+            this.clear();
+        }
+
+        private void initFirstBeanAddTime() {
+            if (firstBeanAddTime == 0) {
+                firstBeanAddTime = System.currentTimeMillis();
+            }
+        }
+
+        private void clear() {
+            this.firstBeanAddTime = 0;
+            this.currentMsgSize = 0;
+            this.traceTransferBeanList.clear();
+        }
+    }
+
+
+    class AsyncDataSendTask implements Runnable {
+        public final String regionId;
+        public final List<TraceTransferBean> traceTransferBeanList;
+
+        public AsyncDataSendTask(String regionId, List<TraceTransferBean> traceTransferBeanList) {
+            this.regionId = regionId;
+            this.traceTransferBeanList = traceTransferBeanList;
+        }
+
+        @Override
+        public void run() {
             StringBuilder buffer = new StringBuilder(1024);

Review Comment:
   Currently, it is enough since the code outside the try-catch is simple, but it may introduce vulnerabilities if someone else adds code to it in the future.
   It is better to show the best practice if you have spare time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] dugenkui03 commented on a diff in pull request #4180: [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher`

Posted by GitBox <gi...@apache.org>.
dugenkui03 commented on code in PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180#discussion_r857142145


##########
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java:
##########
@@ -358,7 +382,7 @@ private void flushData(List<TraceTransferBean> transBeanList, String dataTopic,
          * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
          * @param data   the message trace data in this batch
          */
-        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
+        private void sendTraceDataByMQ(Set<String> keySet, final String data, String regionId) {
             String traceTopic = traceTopicName;

Review Comment:
   Thanks a Lot. I do want to use the topicName generated in previous process, but forget.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] coveralls commented on pull request #4180: [ISSUE #4099]Optimized the performance of sending traceMessage

Posted by GitBox <gi...@apache.org>.
coveralls commented on PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180#issuecomment-1100924380

   
   [![Coverage Status](https://coveralls.io/builds/48334800/badge)](https://coveralls.io/builds/48334800)
   
   Coverage increased (+0.04%) to 52.017% when pulling **64488bd545913c6c35d44e117dacbc3599c19ba0 on dugenkui03:patch-007** into **50e314eb8ada23283dcdbb261766183c90fd435c on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org