You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/04/27 08:09:57 UTC
[rocketmq] branch develop updated: [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher` (#4180)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b6e65a805 [ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher` (#4180)
b6e65a805 is described below
commit b6e65a8054a60cb5c62f5ade8a9e3927cf157185
Author: dugenkui <du...@foxmail.com>
AuthorDate: Wed Apr 27 16:09:39 2022 +0800
[ISSUE #4099]Optimized the performance of sending traceMessage in `AsyncTraceDispatcher` (#4180)
* Optimized the performance of sending traceMessage
* Optimized the performance of sending traceMessage
* Optimized the performance of sending traceMessage
* Optimized the performance of sending traceMessage
---
.../client/trace/AsyncTraceDispatcher.java | 203 ++++++++++++---------
1 file changed, 114 insertions(+), 89 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 7ff8bd77e..19c276238 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
@@ -30,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -59,12 +57,15 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private final int queueSize;
private final int batchSize;
private final int maxMsgSize;
+ private final long pollingTimeMil;
+ private final long waitTimeThresholdMil;
private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecutor;
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
private final ArrayBlockingQueue<TraceContext> traceContextQueue;
+ private final HashMap<String, TraceDataSegment> taskQueueByTopic;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
@@ -72,9 +73,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
- private String traceTopicName;
+ private volatile String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
- private AccessChannel accessChannel = AccessChannel.LOCAL;
+ private volatile AccessChannel accessChannel = AccessChannel.LOCAL;
private String group;
private Type type;
@@ -83,8 +84,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
+ this.pollingTimeMil = 100;
+ this.waitTimeThresholdMil = 500;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
+ this.taskQueueByTopic = new HashMap();
this.group = group;
this.type = type;
@@ -243,113 +247,137 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
@Override
public void run() {
while (!stopped) {
- List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
synchronized (traceContextQueue) {
- for (int i = 0; i < batchSize; i++) {
- TraceContext context = null;
+ long endTime = System.currentTimeMillis() + pollingTimeMil;
+ while (System.currentTimeMillis() < endTime) {
try {
- //get trace data element from blocking Queue - traceContextQueue
- context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- }
- if (context != null) {
- contexts.add(context);
- } else {
- break;
+ TraceContext traceContext = traceContextQueue.poll(
+ endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
+ );
+
+ if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {
+ // get the topic which the trace message will send to
+ String traceTopicName = this.getTraceTopicName(traceContext.getRegionId());
+
+ // get the traceDataSegment which will save this trace message, create if null
+ TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName);
+ if (traceDataSegment == null) {
+ traceDataSegment = new TraceDataSegment(traceTopicName, traceContext.getRegionId());
+ taskQueueByTopic.put(traceTopicName, traceDataSegment);
+ }
+
+ // encode traceContext and save it into traceDataSegment
+ // NOTE if data size in traceDataSegment more than maxMsgSize,
+ // a AsyncDataSendTask will be created and submitted
+ TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext);
+ traceDataSegment.addTraceTransferBean(traceTransferBean);
+ }
+ } catch (InterruptedException ignore) {
+ log.debug("traceContextQueue#poll exception");
}
}
- if (contexts.size() > 0) {
- AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
- traceExecutor.submit(request);
- } else if (AsyncTraceDispatcher.this.stopped) {
+
+ // NOTE send the data in traceDataSegment which the first TraceTransferBean
+ // is longer than waitTimeThreshold
+ sendDataByTimeThreshold();
+
+ if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
- }
- class AsyncAppenderRequest implements Runnable {
- List<TraceContext> contextList;
+ private void sendDataByTimeThreshold() {
+ long now = System.currentTimeMillis();
+ for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
+ if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
+ taskInfo.sendAllData();
+ }
+ }
+ }
- public AsyncAppenderRequest(final List<TraceContext> contextList) {
- if (contextList != null) {
- this.contextList = contextList;
- } else {
- this.contextList = new ArrayList<TraceContext>(1);
+ private String getTraceTopicName(String regionId) {
+ AccessChannel accessChannel = AsyncTraceDispatcher.this.getAccessChannel();
+ if (AccessChannel.CLOUD == accessChannel) {
+ return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
}
+
+ return AsyncTraceDispatcher.this.getTraceTopicName();
}
+ }
- @Override
- public void run() {
- sendTraceData(contextList);
+ class TraceDataSegment {
+ private long firstBeanAddTime;
+ private int currentMsgSize;
+ private final String traceTopicName;
+ private final String regionId;
+ private final List<TraceTransferBean> traceTransferBeanList = new ArrayList();
+
+ TraceDataSegment(String traceTopicName, String regionId) {
+ this.traceTopicName = traceTopicName;
+ this.regionId = regionId;
}
- public void sendTraceData(List<TraceContext> contextList) {
- Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
- for (TraceContext context : contextList) {
- if (context.getTraceBeans().isEmpty()) {
- continue;
- }
- // Topic value corresponding to original message entity content
- String topic = context.getTraceBeans().get(0).getTopic();
- String regionId = context.getRegionId();
- // Use original message entity's topic as key
- String key = topic;
- if (!StringUtils.isBlank(regionId)) {
- key = key + TraceConstants.CONTENT_SPLITOR + regionId;
- }
- List<TraceTransferBean> transBeanList = transBeanMap.get(key);
- if (transBeanList == null) {
- transBeanList = new ArrayList<TraceTransferBean>();
- transBeanMap.put(key, transBeanList);
- }
- TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
- transBeanList.add(traceData);
- }
- for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
- String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
- String dataTopic = entry.getKey();
- String regionId = null;
- if (key.length > 1) {
- dataTopic = key[0];
- regionId = key[1];
- }
- flushData(entry.getValue(), dataTopic, regionId);
+ public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
+ initFirstBeanAddTime();
+ this.traceTransferBeanList.add(traceTransferBean);
+ this.currentMsgSize += traceTransferBean.getTransData().length();
+ if (currentMsgSize >= traceProducer.getMaxMessageSize()) {
+ List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
+ AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
+ traceExecutor.submit(asyncDataSendTask);
+
+ this.clear();
}
}
- /**
- * Batch sending data actually
- */
- private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
- if (transBeanList.size() == 0) {
+ public void sendAllData() {
+ if (this.traceTransferBeanList.isEmpty()) {
return;
}
- // Temporary buffer
+ List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
+ AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
+ traceExecutor.submit(asyncDataSendTask);
+
+ this.clear();
+ }
+
+ private void initFirstBeanAddTime() {
+ if (firstBeanAddTime == 0) {
+ firstBeanAddTime = System.currentTimeMillis();
+ }
+ }
+
+ private void clear() {
+ this.firstBeanAddTime = 0;
+ this.currentMsgSize = 0;
+ this.traceTransferBeanList.clear();
+ }
+ }
+
+
+ class AsyncDataSendTask implements Runnable {
+ private final String traceTopicName;
+ private final String regionId;
+ private final List<TraceTransferBean> traceTransferBeanList;
+
+ public AsyncDataSendTask(String traceTopicName, String regionId, List<TraceTransferBean> traceTransferBeanList) {
+ this.traceTopicName = traceTopicName;
+ this.regionId = regionId;
+ this.traceTransferBeanList = traceTransferBeanList;
+ }
+
+ @Override
+ public void run() {
StringBuilder buffer = new StringBuilder(1024);
- int count = 0;
Set<String> keySet = new HashSet<String>();
-
- for (TraceTransferBean bean : transBeanList) {
- // Keyset of message trace includes msgId of or original message
+ for (TraceTransferBean bean : traceTransferBeanList) {
keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData());
- count++;
- // Ensure that the size of the package should not exceed the upper limit.
- if (buffer.length() >= traceProducer.getMaxMessageSize()) {
- sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
- // Clear temporary buffer after finishing
- buffer.delete(0, buffer.length());
- keySet.clear();
- count = 0;
- }
- }
- if (count > 0) {
- sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
}
- transBeanList.clear();
+ sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName);
}
/**
@@ -357,12 +385,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
*
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch
+ * @param traceTopic the topic which message trace data will send to
*/
- private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
- String traceTopic = traceTopicName;
- if (AccessChannel.CLOUD == accessChannel) {
- traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
- }
+ private void sendTraceDataByMQ(Set<String> keySet, final String data, String traceTopic) {
final Message message = new Message(traceTopic, data.getBytes());
// Keyset of message trace includes msgId of or original message
message.setKeys(keySet);