You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/08 09:18:27 UTC
[incubator-inlong] branch INLONG-1739 updated: [INLONG-1765]
Optimize the realization of class CliProducer (#1766)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-1739 by this push:
new a7d91ae [INLONG-1765] Optimize the realization of class CliProducer (#1766)
a7d91ae is described below
commit a7d91ae0d0bd838d02341e81c562e9a7bfeaf014
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Nov 8 17:18:19 2021 +0800
[INLONG-1765] Optimize the realization of class CliProducer (#1766)
---
.../tubemq/server/tools/cli/CliProducer.java | 35 +++++-----------------
1 file changed, 7 insertions(+), 28 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
index fbf19fd..0ac63a5 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
@@ -19,7 +19,6 @@ package org.apache.inlong.tubemq.server.tools.cli;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,7 +63,7 @@ public class CliProducer extends CliAbstractBase {
// sent data content
private static byte[] sentData;
private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
- private final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
+ private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
private final Map<MessageProducer, MsgSender> producerMap = new HashMap<>();
// cli parameters
@@ -181,21 +180,13 @@ public class CliProducer extends CliAbstractBase {
// initial tubemq client order by caller required
public void initTask() throws Exception {
- // initial sent data
- sentData = MixedUtils.buildTestData(msgDataSize);
// initial client configure
TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
clientConfig.setRpcTimeoutMs(rpcTimeoutMs);
+ // initial sent data
+ sentData = MixedUtils.buildTestData(msgDataSize);
// initial topic send round
- for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
- if (entry.getValue().isEmpty()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
- } else {
- for (String filter : entry.getValue()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
- }
- }
- }
+ topicSendRounds = MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
// initial send thread service
sendExecutorService =
Executors.newFixedThreadPool(sendThreadCnt, new ThreadFactory() {
@@ -265,13 +256,9 @@ public class CliProducer extends CliAbstractBase {
while (msgCount < 0 || sentCount < msgCount) {
roundIndex = (int) (sentCount++ % topicAndCondCnt);
try {
- long millis = System.currentTimeMillis();
Tuple2<String, String> target = topicSendRounds.get(roundIndex);
- Message message = new Message(target.getF0(), sentData);
- if (target.getF1() != null) {
- // if include filter, add filter item
- message.putSystemHeader(target.getF1(), sdf.format(new Date(millis)));
- }
+ Message message = MixedUtils.buildMessage(
+ target.getF0(), target.getF1(), sentData, sentCount, sdf);
// use sync or async process
if (syncProduction) {
MessageSentResult procResult =
@@ -293,15 +280,7 @@ public class CliProducer extends CliAbstractBase {
// Limit sending flow control to avoid frequent errors
// caused by too many inflight messages being sent
if (!withoutDelay) {
- if (sentCount % 5000 == 0) {
- ThreadUtils.sleep(3000);
- } else if (sentCount % 4000 == 0) {
- ThreadUtils.sleep(2000);
- } else if (sentCount % 2000 == 0) {
- ThreadUtils.sleep(800);
- } else if (sentCount % 1000 == 0) {
- ThreadUtils.sleep(400);
- }
+ MixedUtils.coolSending(sentCount);
}
}
// finished, close client