You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/06/27 05:24:22 UTC
[rocketmq] branch develop updated: [ISSUE #4489]Some trace messages not being sent to the broker in time before producer shutdown.
This is an automated email from the ASF dual-hosted git repository.
zhangjidi2016 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b44d3268c [ISSUE #4489]Some trace messages not being sent to the broker in time before producer shutdown.
new cdfa5ced7 Merge pull request #4490 from zhangjidi2016/fix_trace_lost
b44d3268c is described below
commit b44d3268c7a5a30d92fa8700af876d1b063df646
Author: zhangjidi <zh...@cmss.chinamobile.com>
AuthorDate: Tue Jun 21 10:16:08 2022 +0800
[ISSUE #4489]Some trace messages not being sent to the broker in time before producer shutdown.
---
.../rocketmq/client/trace/AsyncTraceDispatcher.java | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 7652ee0e5..410ca1e33 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -99,12 +99,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
- 10, //
- 20, //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.appenderQueue, //
- new ThreadFactoryImpl("MQTraceSendThread_"));
+ 10, //
+ 20, //
+ 1000 * 60, //
+ TimeUnit.MILLISECONDS, //
+ this.appenderQueue, //
+ new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}
@@ -188,6 +188,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
// The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long end = System.currentTimeMillis() + 500;
while (System.currentTimeMillis() <= end) {
+ synchronized (taskQueueByTopic) {
+ for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
+ taskInfo.sendAllData();
+ }
+ }
synchronized (traceContextQueue) {
if (traceContextQueue.size() == 0 && appenderQueue.size() == 0) {
break;
@@ -252,7 +257,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
while (System.currentTimeMillis() < endTime) {
try {
TraceContext traceContext = traceContextQueue.poll(
- endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
+ endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
);
if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {