You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2021/06/25 02:47:56 UTC
[rocketmq] branch develop updated: [ISSUE #2988] fix fail to send
trace of last message before shutting down producer
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a2f8810 [ISSUE #2988] fix fail to send trace of last message before shutting down producer
a2f8810 is described below
commit a2f8810c9adedcd82fd4cb9a69b17128a1a96b5e
Author: yuz10 <84...@qq.com>
AuthorDate: Fri Jun 25 10:47:48 2021 +0800
[ISSUE #2988] fix fail to send trace of last message before shutting down producer
---
.../client/trace/AsyncTraceDispatcher.java | 70 ++++++++++++----------
1 file changed, 37 insertions(+), 33 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 754cbd5..7ff8bd7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.client.trace;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
@@ -64,7 +64,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
- private ArrayBlockingQueue<TraceContext> traceContextQueue;
+ private final ArrayBlockingQueue<TraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
@@ -78,7 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String group;
private Type type;
- public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) {
+ public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
@@ -95,12 +95,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
- 10, //
- 20, //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.appenderQueue, //
- new ThreadFactoryImpl("MQTraceSendThread_"));
+ 10, //
+ 20, //
+ 1000 * 60, //
+ TimeUnit.MILLISECONDS, //
+ this.appenderQueue, //
+ new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}
@@ -180,10 +180,15 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
}
@Override
- public void flush() throws IOException {
+ public void flush() {
// The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long end = System.currentTimeMillis() + 500;
- while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
+ while (System.currentTimeMillis() <= end) {
+ synchronized (traceContextQueue) {
+ if (traceContextQueue.size() == 0 && appenderQueue.size() == 0) {
+ break;
+ }
+ }
try {
Thread.sleep(1);
} catch (InterruptedException e) {
@@ -196,6 +201,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
@Override
public void shutdown() {
this.stopped = true;
+ flush();
this.traceExecutor.shutdown();
if (isStarted.get()) {
traceProducer.shutdown();
@@ -212,11 +218,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
- try {
- flush();
- } catch (IOException e) {
- log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");
- }
+ flush();
}
}
}
@@ -242,25 +244,27 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void run() {
while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
- for (int i = 0; i < batchSize; i++) {
- TraceContext context = null;
- try {
- //get trace data element from blocking Queue — traceContextQueue
- context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
+ synchronized (traceContextQueue) {
+ for (int i = 0; i < batchSize; i++) {
+ TraceContext context = null;
+ try {
+ //get trace data element from blocking Queue - traceContextQueue
+ context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ }
+ if (context != null) {
+ contexts.add(context);
+ } else {
+ break;
+ }
}
- if (context != null) {
- contexts.add(context);
- } else {
- break;
+ if (contexts.size() > 0) {
+ AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
+ traceExecutor.submit(request);
+ } else if (AsyncTraceDispatcher.this.stopped) {
+ this.stopped = true;
}
}
- if (contexts.size() > 0) {
- AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
- traceExecutor.submit(request);
- } else if (AsyncTraceDispatcher.this.stopped) {
- this.stopped = true;
- }
}
}
@@ -352,7 +356,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
* Send message trace data
*
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
- * @param data the message trace data in this batch
+ * @param data the message trace data in this batch
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String traceTopic = traceTopicName;