You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/08 09:18:27 UTC

[incubator-inlong] branch INLONG-1739 updated: [INLONG-1765] Optimize the realization of class CliProducer (#1766)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-1739 by this push:
     new a7d91ae  [INLONG-1765] Optimize the realization of class CliProducer (#1766)
a7d91ae is described below

commit a7d91ae0d0bd838d02341e81c562e9a7bfeaf014
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Nov 8 17:18:19 2021 +0800

    [INLONG-1765] Optimize the realization of class CliProducer (#1766)
---
 .../tubemq/server/tools/cli/CliProducer.java       | 35 +++++-----------------
 1 file changed, 7 insertions(+), 28 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
index fbf19fd..0ac63a5 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
@@ -19,7 +19,6 @@ package org.apache.inlong.tubemq.server.tools.cli;
 
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +63,7 @@ public class CliProducer extends CliAbstractBase {
     // sent data content
     private static byte[] sentData;
     private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
-    private final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
+    private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
     private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
     private final Map<MessageProducer, MsgSender> producerMap = new HashMap<>();
     // cli parameters
@@ -181,21 +180,13 @@ public class CliProducer extends CliAbstractBase {
 
     // initial tubemq client order by caller required
     public void initTask() throws Exception {
-        // initial sent data
-        sentData = MixedUtils.buildTestData(msgDataSize);
         // initial client configure
         TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
         clientConfig.setRpcTimeoutMs(rpcTimeoutMs);
+        // initial sent data
+        sentData = MixedUtils.buildTestData(msgDataSize);
         // initial topic send round
-        for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
-            if (entry.getValue().isEmpty()) {
-                topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
-            } else {
-                for (String filter : entry.getValue()) {
-                    topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
-                }
-            }
-        }
+        topicSendRounds = MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
         // initial send thread service
         sendExecutorService =
                 Executors.newFixedThreadPool(sendThreadCnt, new ThreadFactory() {
@@ -265,13 +256,9 @@ public class CliProducer extends CliAbstractBase {
             while (msgCount < 0 || sentCount < msgCount) {
                 roundIndex = (int) (sentCount++ % topicAndCondCnt);
                 try {
-                    long millis = System.currentTimeMillis();
                     Tuple2<String, String> target = topicSendRounds.get(roundIndex);
-                    Message message = new Message(target.getF0(), sentData);
-                    if (target.getF1() != null) {
-                        // if include filter, add filter item
-                        message.putSystemHeader(target.getF1(), sdf.format(new Date(millis)));
-                    }
+                    Message message = MixedUtils.buildMessage(
+                            target.getF0(), target.getF1(), sentData, sentCount, sdf);
                     // use sync or async process
                     if (syncProduction) {
                         MessageSentResult procResult =
@@ -293,15 +280,7 @@ public class CliProducer extends CliAbstractBase {
                 // Limit sending flow control to avoid frequent errors
                 // caused by too many inflight messages being sent
                 if (!withoutDelay) {
-                    if (sentCount % 5000 == 0) {
-                        ThreadUtils.sleep(3000);
-                    } else if (sentCount % 4000 == 0) {
-                        ThreadUtils.sleep(2000);
-                    } else if (sentCount % 2000 == 0) {
-                        ThreadUtils.sleep(800);
-                    } else if (sentCount % 1000 == 0) {
-                        ThreadUtils.sleep(400);
-                    }
+                    MixedUtils.coolSending(sentCount);
                 }
             }
             // finished, close client