You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/06/27 05:24:22 UTC

[rocketmq] branch develop updated: [ISSUE #4489]Some trace messages not being sent to the broker in time before producer shutdown.

This is an automated email from the ASF dual-hosted git repository.

zhangjidi2016 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b44d3268c [ISSUE #4489]Some trace messages not being sent to the broker in time before producer shutdown.
     new cdfa5ced7 Merge pull request #4490 from zhangjidi2016/fix_trace_lost
b44d3268c is described below

commit b44d3268c7a5a30d92fa8700af876d1b063df646
Author: zhangjidi <zh...@cmss.chinamobile.com>
AuthorDate: Tue Jun 21 10:16:08 2022 +0800

    [ISSUE #4489]Some trace messages not being sent to the broker in time before producer shutdown.
---
 .../rocketmq/client/trace/AsyncTraceDispatcher.java   | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 7652ee0e5..410ca1e33 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -99,12 +99,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
             this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
         }
         this.traceExecutor = new ThreadPoolExecutor(//
-                10, //
-                20, //
-                1000 * 60, //
-                TimeUnit.MILLISECONDS, //
-                this.appenderQueue, //
-                new ThreadFactoryImpl("MQTraceSendThread_"));
+            10, //
+            20, //
+            1000 * 60, //
+            TimeUnit.MILLISECONDS, //
+            this.appenderQueue, //
+            new ThreadFactoryImpl("MQTraceSendThread_"));
         traceProducer = getAndCreateTraceProducer(rpcHook);
     }
 
@@ -188,6 +188,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
         long end = System.currentTimeMillis() + 500;
         while (System.currentTimeMillis() <= end) {
+            synchronized (taskQueueByTopic) {
+                for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
+                    taskInfo.sendAllData();
+                }
+            }
             synchronized (traceContextQueue) {
                 if (traceContextQueue.size() == 0 && appenderQueue.size() == 0) {
                     break;
@@ -252,7 +257,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
                     while (System.currentTimeMillis() < endTime) {
                         try {
                             TraceContext traceContext = traceContextQueue.poll(
-                                    endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
+                                endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
                             );
 
                             if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {