You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2021/06/25 02:47:56 UTC

[rocketmq] branch develop updated: [ISSUE #2988] fix fail to send trace of last message before shutting down producer

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a2f8810  [ISSUE #2988] fix fail to send trace of last message before shutting down producer
a2f8810 is described below

commit a2f8810c9adedcd82fd4cb9a69b17128a1a96b5e
Author: yuz10 <84...@qq.com>
AuthorDate: Fri Jun 25 10:47:48 2021 +0800

    [ISSUE #2988] fix fail to send trace of last message before shutting down producer
---
 .../client/trace/AsyncTraceDispatcher.java         | 70 ++++++++++++----------
 1 file changed, 37 insertions(+), 33 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 754cbd5..7ff8bd7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.client.trace;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
@@ -64,7 +64,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     // The last discard number of log
     private AtomicLong discardCount;
     private Thread worker;
-    private ArrayBlockingQueue<TraceContext> traceContextQueue;
+    private final ArrayBlockingQueue<TraceContext> traceContextQueue;
     private ArrayBlockingQueue<Runnable> appenderQueue;
     private volatile Thread shutDownHook;
     private volatile boolean stopped = false;
@@ -78,7 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private String group;
     private Type type;
 
-    public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) {
+    public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
         // queueSize is greater than or equal to the n power of 2 of value
         this.queueSize = 2048;
         this.batchSize = 100;
@@ -95,12 +95,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
             this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
         }
         this.traceExecutor = new ThreadPoolExecutor(//
-            10, //
-            20, //
-            1000 * 60, //
-            TimeUnit.MILLISECONDS, //
-            this.appenderQueue, //
-            new ThreadFactoryImpl("MQTraceSendThread_"));
+                10, //
+                20, //
+                1000 * 60, //
+                TimeUnit.MILLISECONDS, //
+                this.appenderQueue, //
+                new ThreadFactoryImpl("MQTraceSendThread_"));
         traceProducer = getAndCreateTraceProducer(rpcHook);
     }
 
@@ -180,10 +180,15 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     }
 
     @Override
-    public void flush() throws IOException {
+    public void flush() {
         // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
         long end = System.currentTimeMillis() + 500;
-        while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
+        while (System.currentTimeMillis() <= end) {
+            synchronized (traceContextQueue) {
+                if (traceContextQueue.size() == 0 && appenderQueue.size() == 0) {
+                    break;
+                }
+            }
             try {
                 Thread.sleep(1);
             } catch (InterruptedException e) {
@@ -196,6 +201,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     @Override
     public void shutdown() {
         this.stopped = true;
+        flush();
         this.traceExecutor.shutdown();
         if (isStarted.get()) {
             traceProducer.shutdown();
@@ -212,11 +218,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
                 public void run() {
                     synchronized (this) {
                         if (!this.hasShutdown) {
-                            try {
-                                flush();
-                            } catch (IOException e) {
-                                log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");
-                            }
+                            flush();
                         }
                     }
                 }
@@ -242,25 +244,27 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         public void run() {
             while (!stopped) {
                 List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
-                for (int i = 0; i < batchSize; i++) {
-                    TraceContext context = null;
-                    try {
-                        //get trace data element from blocking Queue — traceContextQueue
-                        context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
-                    } catch (InterruptedException e) {
+                synchronized (traceContextQueue) {
+                    for (int i = 0; i < batchSize; i++) {
+                        TraceContext context = null;
+                        try {
+                            //get trace data element from blocking Queue - traceContextQueue
+                            context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
+                        } catch (InterruptedException e) {
+                        }
+                        if (context != null) {
+                            contexts.add(context);
+                        } else {
+                            break;
+                        }
                     }
-                    if (context != null) {
-                        contexts.add(context);
-                    } else {
-                        break;
+                    if (contexts.size() > 0) {
+                        AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
+                        traceExecutor.submit(request);
+                    } else if (AsyncTraceDispatcher.this.stopped) {
+                        this.stopped = true;
                     }
                 }
-                if (contexts.size() > 0) {
-                    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
-                    traceExecutor.submit(request);
-                } else if (AsyncTraceDispatcher.this.stopped) {
-                    this.stopped = true;
-                }
             }
 
         }
@@ -352,7 +356,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
          * Send message trace data
          *
          * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
-         * @param data the message trace data in this batch
+         * @param data   the message trace data in this batch
          */
         private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
             String traceTopic = traceTopicName;