You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:07 UTC
[incubator-tubemq] 09/49: [TUBEMQ-444]Add consume and produce Cli
commands (#343)
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 6ce60a894b1bed29d8bd1602d936cf9c32581b77
Author: gosonzhang <46...@qq.com>
AuthorDate: Wed Dec 9 15:38:34 2020 +0800
[TUBEMQ-444]Add consume and produce Cli commands (#343)
Co-authored-by: gosonzhang <go...@tencent.com>
---
...nsumer-perf-test.sh => tubemq-consumer-test.sh} | 2 +-
...oducer-perf-test.sh => tubemq-producer-test.sh} | 2 +-
.../apache/tubemq/corebase/TErrCodeConstants.java | 6 +
.../apache/tubemq/corebase/utils/MixedUtils.java | 59 +++
tubemq-example/pom.xml | 10 -
tubemq-example/src/main/assembly/assembly.xml | 3 -
.../apache/tubemq/example/ArgsParserHelper.java | 48 ---
.../tubemq/example/MAMessageProducerExample.java | 100 ++----
.../tubemq/example/MessageConsumerExample.java | 125 +++----
.../tubemq/server/common/fielddef/CliArgDef.java | 70 ++--
.../tubemq/server/tools/cli/CliAbstractBase.java | 76 ++++
.../tubemq/server/tools/cli/CliConsumer.java | 394 +++++++++++++++++++++
.../tubemq/server/tools/cli/CliProducer.java | 385 ++++++++++++++++++++
13 files changed, 1037 insertions(+), 243 deletions(-)
diff --git a/bin/tubemq-consumer-perf-test.sh b/bin/tubemq-consumer-test.sh
similarity index 94%
rename from bin/tubemq-consumer-perf-test.sh
rename to bin/tubemq-consumer-test.sh
index 54189a2..23292fb 100644
--- a/bin/tubemq-consumer-perf-test.sh
+++ b/bin/tubemq-consumer-test.sh
@@ -37,4 +37,4 @@ if [ -z "$BASE_DIR" ] ; then
#echo "TubeMQ master is at $BASE_DIR"
fi
source $BASE_DIR/bin/env.sh
-$JAVA $TOOLS_ARGS org.apache.tubemq.example.MessageConsumerExample $@
+$JAVA $TOOLS_ARGS org.apache.tubemq.server.tools.cli.CliConsumer $@
diff --git a/bin/tubemq-producer-perf-test.sh b/bin/tubemq-producer-test.sh
similarity index 94%
rename from bin/tubemq-producer-perf-test.sh
rename to bin/tubemq-producer-test.sh
index ed76c3a..0f96033 100644
--- a/bin/tubemq-producer-perf-test.sh
+++ b/bin/tubemq-producer-test.sh
@@ -37,4 +37,4 @@ if [ -z "$BASE_DIR" ] ; then
#echo "TubeMQ master is at $BASE_DIR"
fi
source $BASE_DIR/bin/env.sh
-$JAVA $TOOLS_ARGS org.apache.tubemq.example.MAMessageProducerExample $@
+$JAVA $TOOLS_ARGS org.apache.tubemq.server.tools.cli.CliProducer $@
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java
index 441af38..8bdafa0 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java
@@ -18,6 +18,9 @@
package org.apache.tubemq.corebase;
+import java.util.Arrays;
+import java.util.List;
+
public class TErrCodeConstants {
public static final int SUCCESS = 200;
public static final int NOT_READY = 201;
@@ -44,4 +47,7 @@ public class TErrCodeConstants {
public static final int SERVICE_UNAVAILABLE = 503;
public static final int INTERNAL_SERVER_ERROR_MSGSET_NULL = 510;
+ public static final List<Integer> IGNORE_ERROR_SET =
+ Arrays.asList(BAD_REQUEST, NOT_FOUND, ALL_PARTITION_FROZEN,
+ NO_PARTITION_ASSIGNED, ALL_PARTITION_WAITING, ALL_PARTITION_INUSE);
}
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
index 17ccb5e..bfbedad 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
@@ -17,6 +17,16 @@
package org.apache.tubemq.corebase.utils;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.tubemq.corebase.TokenConstants;
+
+
+
public class MixedUtils {
// java version cache
private static String javaVersion = "";
@@ -33,4 +43,53 @@ public class MixedUtils {
return javaVersion.substring(0, maxLen);
}
}
+
+ /**
+ * parse topic Parameter with format topic_1[,topic_2[:filterCond_2.1[;filterCond_2.2]]]
+ * topicParam->set(filterCond) map
+ * @param topicParam - composite string
+ * @return - map of topic->set(filterCond)
+ */
+ public static Map<String, TreeSet<String>> parseTopicParam(String topicParam) {
+ Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
+ if (TStringUtils.isBlank(topicParam)) {
+ return topicAndFiltersMap;
+ }
+ String[] topicFilterStrs = topicParam.split(TokenConstants.ARRAY_SEP);
+ for (String topicFilterStr : topicFilterStrs) {
+ if (TStringUtils.isBlank(topicFilterStr)) {
+ continue;
+ }
+ String[] topicFilters = topicFilterStr.split(TokenConstants.ATTR_SEP);
+ if (TStringUtils.isBlank(topicFilters[0])) {
+ continue;
+ }
+ TreeSet<String> filterSet = new TreeSet<>();
+ if (topicFilters.length > 1
+ && TStringUtils.isNotBlank(topicFilters[1])) {
+ String[] filterItems = topicFilters[1].split(TokenConstants.LOG_SEG_SEP);
+ for (String filterItem : filterItems) {
+ if (TStringUtils.isBlank(filterItem)) {
+ continue;
+ }
+ filterSet.add(filterItem.trim());
+ }
+ }
+ topicAndFiltersMap.put(topicFilters[0].trim(), filterSet);
+ }
+ return topicAndFiltersMap;
+ }
+
+ public static byte[] buildTestData(int bodySize) {
+ final byte[] transmitData =
+ StringUtils.getBytesUtf8("This is a test data!");
+ final ByteBuffer dataBuffer = ByteBuffer.allocate(bodySize);
+ while (dataBuffer.hasRemaining()) {
+ int offset = dataBuffer.arrayOffset();
+ dataBuffer.put(transmitData, offset,
+ Math.min(dataBuffer.remaining(), transmitData.length));
+ }
+ dataBuffer.flip();
+ return dataBuffer.array();
+ }
}
diff --git a/tubemq-example/pom.xml b/tubemq-example/pom.xml
index 319acab..9ab6775 100644
--- a/tubemq-example/pom.xml
+++ b/tubemq-example/pom.xml
@@ -65,16 +65,6 @@
<groupId>org.apache.tubemq</groupId>
<artifactId>tubemq-client</artifactId>
</dependency>
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
diff --git a/tubemq-example/src/main/assembly/assembly.xml b/tubemq-example/src/main/assembly/assembly.xml
index 596af24..11edd32 100644
--- a/tubemq-example/src/main/assembly/assembly.xml
+++ b/tubemq-example/src/main/assembly/assembly.xml
@@ -32,9 +32,6 @@
<directory>../</directory>
<includes>
<include>./conf/tools.log4j.properties</include>
- <include>./bin/tubemq-consumer-perf-test.sh</include>
- <include>./bin/tubemq-producer-perf-test.sh</include>
- <include>./bin/env.sh</include>
<include>LICENSE</include>
<include>NOTICE</include>
<include>DISCLAIMER-WIP</include>
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java
deleted file mode 100644
index c507ae4..0000000
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.example;
-
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-
-public class ArgsParserHelper {
-
- /**
- * Print help information and exit.
- *
- * @param opts - options
- */
- public static void help(String commandName, Options opts) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(commandName, opts);
- System.exit(0);
- }
-
- /**
- * Init common options when parsing args.
- * @return - options
- */
- public static Options initCommonOptions() {
- Options options = new Options();
- options.addOption(null, "help", false, "show help");
- options.addOption(null, "master-list", true, "master address like: host1:8000,host2:8000");
- options.addOption(null, "topic", true, "topic list, topic1,topic2 or "
- + "topic1:tid11;tid12,topic2:tid21;tid22(consumer only)");
- return options;
- }
-}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
index 9c76ce1..f76b077 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
@@ -30,13 +30,8 @@ import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Options;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.exception.TubeClientException;
@@ -64,7 +59,7 @@ public class MAMessageProducerExample {
private static final int SESSION_FACTORY_NUM = 10;
private static Set<String> topicSet;
- private static int batchCount;
+ private static int msgCount;
private static int producerCount;
private static byte[] sendData;
@@ -94,72 +89,45 @@ public class MAMessageProducerExample {
}
}
- /**
- * Init options
- *
- * @return options
- */
- public static Options initOptions() {
- Options options = ArgsParserHelper.initCommonOptions();
- options.addOption(null, "batch-size", true, "number of messages in single batch, default is 100000");
- options.addOption(null, "max-batch", true, "max batch number, default is 1024");
- options.addOption(null, "thread-num", true, "thread number of producers, default is 1, max is 100");
- return options;
- }
-
public static void main(String[] args) {
- Options options = null;
- try {
- CommandLineParser parser = new DefaultParser();
- options = initOptions();
- CommandLine cl = parser.parse(options, args);
- if (cl != null) {
- final String masterHostAndPort = cl.getOptionValue("master-list");
- final String topics = cl.getOptionValue("topic");
- final List<String> topicList = Arrays.asList(topics.split(","));
- topicSet = new TreeSet<>(topicList);
-
- batchCount = Integer.parseInt(cl.getOptionValue("max-batch", "100000"));
- int batchSize = Integer.parseInt(cl.getOptionValue("batch-size", "1024"));
- producerCount = Math.min(Integer.parseInt(cl.getOptionValue(
- "thread-num", "1")), MAX_PRODUCER_NUM);
- logger.info("MAMessageProducerExample.main started...");
- final byte[] transmitData = StringUtils
- .getBytesUtf8("This is a test message from multi-session factory.");
- final ByteBuffer dataBuffer = ByteBuffer.allocate(batchSize);
-
- while (dataBuffer.hasRemaining()) {
- int offset = dataBuffer.arrayOffset();
- dataBuffer.put(transmitData, offset,
- Math.min(dataBuffer.remaining(), transmitData.length));
- }
+ final String masterHostAndPort = args[0];
- dataBuffer.flip();
- sendData = dataBuffer.array();
+ final String topics = args[1];
+ final List<String> topicList = Arrays.asList(topics.split(","));
- try {
- MAMessageProducerExample messageProducer = new MAMessageProducerExample(
- masterHostAndPort);
+ topicSet = new TreeSet<>(topicList);
- messageProducer.startService();
+ msgCount = Integer.parseInt(args[2]);
+ producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
- while (SENT_SUCC_COUNTER.get() < (long) batchCount * producerCount * topicSet.size()) {
- TimeUnit.MILLISECONDS.sleep(1000);
- }
- messageProducer.producerMap.clear();
- messageProducer.shutdown();
+ logger.info("MAMessageProducerExample.main started...");
- } catch (TubeClientException e) {
- logger.error("TubeClientException: ", e);
- } catch (Throwable e) {
- logger.error("Throwable: ", e);
- }
- }
- } catch (Exception ex) {
- logger.error(ex.getMessage());
- if (options != null) {
- ArgsParserHelper.help("./tubemq-producer-perf-test.sh", options);
+ final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
+ final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
+
+ while (dataBuffer.hasRemaining()) {
+ int offset = dataBuffer.arrayOffset();
+ dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length));
+ }
+
+ dataBuffer.flip();
+ sendData = dataBuffer.array();
+
+ try {
+ MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort);
+
+ messageProducer.startService();
+
+ while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) {
+ Thread.sleep(1000);
}
+ messageProducer.producerMap.clear();
+ messageProducer.shutdown();
+
+ } catch (TubeClientException e) {
+ logger.error("TubeClientException: ", e);
+ } catch (Throwable e) {
+ logger.error("Throwable: ", e);
}
}
@@ -205,7 +173,7 @@ public class MAMessageProducerExample {
} catch (Throwable t) {
logger.error("publish exception: ", t);
}
- for (int i = 0; i < batchCount; i++) {
+ for (int i = 0; i < msgCount; i++) {
long millis = System.currentTimeMillis();
for (String topic : topicSet) {
try {
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
index 3b99943..d9aeb8a 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
@@ -26,10 +26,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Options;
import org.apache.tubemq.client.common.PeerInfo;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
@@ -63,45 +59,29 @@ public final class MessageConsumerExample {
private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
private final PushMessageConsumer messageConsumer;
+ private final MessageSessionFactory messageSessionFactory;
- public MessageConsumerExample(String masterHostAndPort, String group,
- int fetchCount, boolean isFromBegin) throws Exception {
+ public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception {
ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
- if (isFromBegin) {
- consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
- } else {
- consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
- }
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
if (fetchCount > 0) {
consumerConfig.setPushFetchThreadCnt(fetchCount);
}
- MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
}
- /**
- * Init options
- * @return options
- */
- public static Options initOptions() {
-
- Options options = ArgsParserHelper.initCommonOptions();
- options.addOption(null, "batch-size", true, "max number of fetching message in one batch");
- options.addOption(null, "thread-num", true, "thread number of consumers");
- options.addOption(null, "group", true, "consumer group");
- options.addOption(null, "from-begin", false, "default is consuming from latest, "
- + "if option is clarified, then consume from begin");
- return options;
-
- }
+ public static void main(String[] args) {
+ final String masterHostAndPort = args[0];
+ final String topics = args[1];
+ final String group = args[2];
+ final int consumerCount = Integer.parseInt(args[3]);
+ int fetchCount = -1;
+ if (args.length > 5) {
+ fetchCount = Integer.parseInt(args[4]);
+ }
+ final Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
- /**
- * init topic->set(tid) map
- * @param topics - topics string
- * @return - map of topic->set(tid)
- */
- private static Map<String, TreeSet<String>> initTopicList(String topics) {
- Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
String[] topicTidsList = topics.split(",");
for (String topicTids : topicTidsList) {
String[] topicTidStr = topicTids.split(":");
@@ -115,60 +95,35 @@ public final class MessageConsumerExample {
}
topicTidsMap.put(topicTidStr[0], tids);
}
- return topicTidsMap;
- }
+ final int startFetchCount = fetchCount;
+ final ExecutorService executorService = Executors.newFixedThreadPool(fetchCount);
+ for (int i = 0; i < consumerCount; i++) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ MessageConsumerExample messageConsumer = new MessageConsumerExample(
+ masterHostAndPort,
+ group,
+ startFetchCount
+ );
+ messageConsumer.subscribe(topicTidsMap);
+ } catch (Exception e) {
+ logger.error("Create consumer failed!", e);
+ }
+ }
+ });
+ }
+ final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread");
+ statisticThread.start();
- public static void main(String[] args) {
- Options options = null;
+ executorService.shutdown();
try {
- CommandLineParser parser = new DefaultParser();
- options = initOptions();
- CommandLine cl = parser.parse(options, args);
- if (cl != null) {
- final String masterHostAndPort = cl.getOptionValue("master-list");
- final Map<String, TreeSet<String>> topicTidsMap = initTopicList(
- cl.getOptionValue("topic"));
- final String group = cl.getOptionValue("group");
- int threadNum = Integer.parseInt(cl.getOptionValue("thread-num", "1"));
- final int fetchCount = Integer.parseInt(cl.getOptionValue("batch-size", "-1"));
- final boolean isFromBegin = cl.hasOption("from-begin");
- ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
- for (int i = 0; i < threadNum; i++) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- MessageConsumerExample messageConsumer = new MessageConsumerExample(
- masterHostAndPort,
- group,
- fetchCount,
- isFromBegin
- );
- messageConsumer.subscribe(topicTidsMap);
- } catch (Exception e) {
- logger.error("Create consumer failed!", e);
- }
- }
- });
- }
- final Thread statisticThread = new Thread(msgRecvStats,
- "Received Statistic Thread");
- statisticThread.start();
-
- executorService.shutdown();
- try {
- executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.error("Thread Pool shutdown has been interrupted!");
- }
- msgRecvStats.stopStats();
- }
- } catch (Exception ex) {
- logger.error(ex.getMessage(), ex.getMessage());
- if (options != null) {
- ArgsParserHelper.help("./tubemq-consumer-perf-test.sh", options);
- }
+ executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.error("Thread Pool shutdown has been interrupted!");
}
+ msgRecvStats.stopStats();
}
public void subscribe(Map<String, TreeSet<String>> topicTidsMap) throws TubeClientException {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java
index b2d9327..abb2e2a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java
@@ -27,64 +27,76 @@ public enum CliArgDef {
HELP("h", "help", "Print usage information."),
VERSION("v", "version", "Display TubeMQ version."),
- MASTERSERVER("master-servers", "master-servers",
+ MASTERSERVER(null, "master-servers",
"String: format is master1_ip:port[,master2_ip:port]",
"The master address(es) to connect to."),
- MASTERURL("master-url", "master-url",
+ MASTERURL(null, "master-url",
"String: format is http://master_ip:master_webport/",
"Master Service URL to which to connect.(default: http://localhost:8080/)"),
- BROKERURL("broker-url", "broker-url",
+ BROKERURL(null, "broker-url",
"String: format is http://broker_ip:broker_webport/",
"Broker Service URL to which to connect.(default: http://localhost:8081/)"),
- MESSAGES("messages", "messages",
+ MESSAGES(null, "messages",
"Long: count",
"The number of messages to send or consume, If not set, production or consumption is continual."),
- MSGDATASIZE("msg-data-size", "message-data-size",
- "Int: message size",
+ MSGDATASIZE(null, "message-data-size",
+ "Int: message size,(0, 1024 * 1024)",
"message's data size in bytes. Note that you must provide exactly"
+ " one of --msg-data-size or --payload-file."),
- PAYLOADFILE("payload-file", "payload-file",
+ PAYLOADFILE(null, "payload-file",
"String: payload file path",
"file to read the message payloads from. This works only for"
+ " UTF-8 encoded text files. Payloads will be read from this"
+ " file and a payload will be randomly selected when sending"
+ " messages. Note that you must provide exactly one"
+ " of --msg-data-size or --payload-file."),
- PAYLOADDELIM("payload-delimiter", "payload-delimiter",
+ PAYLOADDELIM(null, "payload-delimiter",
"String: payload data's delimiter",
"provides delimiter to be used when --payload-file is provided."
+ " Defaults to new line. Note that this parameter will be"
+ " ignored if --payload-file is not provided. (default: \\n)"),
PRDTOPIC("topic", "topicName",
- "String: topic, format is topic_1[,topic_2[:filterCond_2.1[;filterCond_2.2]]]",
+ "String: topic, format is topic_1[,topic_2[:filterCond_2.1[\\;filterCond_2.2]]]",
"The topic(s) to produce messages to."),
CNSTOPIC("topic", "topicName",
- "String: topic, format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]",
+ "String: topic, format is topic_1[[:filterCond_1.1[\\;filterCond_1.2]][,topic_2]]",
"The topic(s) to consume on."),
- RPCTIMEOUT("timeout", "timeout",
+ RPCTIMEOUT(null, "rpc-timeout",
"Long: milliseconds",
"The maximum duration between request and response in milliseconds. (default: 10000)"),
+ CONNREUSE(null, "conn-reuse",
+ "bool: true or false",
+ "Different clients reuse TCP connections. (default: true)"),
GROUP("group", "groupName",
"String: consumer group",
- "The consumer group name of the consumer."),
- CLIENTCOUNT("client-num", "client-num",
- "Int: client count",
- "Number of consumers to started."),
- PULLMODEL("pull-model", "pull-model",
- "Pull consumption model."),
- PUSHMODEL("push-model", "push-model",
- "Push consumption model."),
- FETCHTHREADS("num-fetch-threads", "num-fetch-threads",
- "Integer: count",
+ "The consumer group name of the consumer. (default: test_consume)"),
+ CLIENTCOUNT(null, "client-count",
+ "Int: client count, [1, 100]",
+ "Number of producers or consumers to started."),
+ PUSHCONSUME(null, "consume-push",
+ "Push consumption action.(default: pull)"),
+ FETCHTHREADS(null, "num-fetch-threads",
+ "Integer: count, [1,100]",
"Number of fetch threads, default: num of cpu count."),
- FROMLATEST("from-latest", "from-latest",
- "Start to consume from the latest message present in the log."),
- FROMBEGINNING("from-beginning", "from-beginning",
- "If the consumer does not already have an established offset to consume from,"
- + " start with the earliest message present in the log rather than the latest message."),
- OUTPUTINTERVAL("output-interval", "output-interval",
- "Integer: interval_ms",
- "Interval in milliseconds at which to print progress info. (default: 5000)");
+ SENDTHREADS(null, "num-send-threads",
+ "Integer: count, [1,200]",
+ "Number of send message threads, default: num of cpu count."),
+ CONSUMEPOS(null, "consume-position",
+ "Integer: [-1,0, 1]",
+ "Set the start position of the consumer group. The value can be [-1, 0, 1]."
+ + " Default value is 0. -1: Start from 0 for the first time."
+ + " Otherwise start from last consume position."
+ + " 0: Start from the latest position for the first time."
+ + " Otherwise start from last consume position."
+ + " 1: Start from the latest consume position."),
+ OUTPUTINTERVAL(null, "output-interval",
+ "Integer: interval_ms, [5000, +)",
+ "Interval in milliseconds at which to print progress info. (default: 5000)"),
+ SYNCPRODUCE(null, "sync-produce",
+ "Synchronous production. (default: false)"),
+ WITHOUTDELAY(null, "without-delay",
+ "Production without delay. (default: false)");
+
CliArgDef(String opt, String longOpt, String optDesc) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliAbstractBase.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliAbstractBase.java
new file mode 100644
index 0000000..c0903f2
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliAbstractBase.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.tools.cli;
+
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.tubemq.server.common.TubeServerVersion;
+import org.apache.tubemq.server.common.fielddef.CliArgDef;
+
+
+public abstract class CliAbstractBase {
+
+ protected final String commandName;
+ protected Options options = new Options();
+ protected CommandLineParser parser = new DefaultParser();
+ private HelpFormatter formatter = new HelpFormatter();
+
+ public CliAbstractBase(String commandName) {
+ this.commandName = commandName;
+ addCommandOption(CliArgDef.HELP);
+ addCommandOption(CliArgDef.VERSION);
+ formatter.setWidth(500);
+ }
+
+ /**
+ * Print help information and exit.
+ *
+ */
+ public void help() {
+ formatter.printHelp(commandName, options);
+ System.exit(0);
+ }
+
+ /**
+ * Print tubemq server version.
+ *
+ */
+ public void version() {
+ System.out.println("TubeMQ " + TubeServerVersion.BROKER_VERSION);
+ System.exit(0);
+ }
+
+ public void addCommandOption(CliArgDef cliArgDef) {
+ Option option = new Option(cliArgDef.opt,
+ cliArgDef.longOpt, cliArgDef.hasArg, cliArgDef.optDesc);
+ if (cliArgDef.hasArg) {
+ option.setArgName(cliArgDef.argDesc);
+ }
+ options.addOption(option);
+ }
+
+
+ protected abstract void initCommandOptions();
+
+
+ public abstract boolean parseParams(String[] args) throws Exception;
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java
new file mode 100644
index 0000000..666e6f9
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.tools.cli;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
+import org.apache.tubemq.client.common.PeerInfo;
+import org.apache.tubemq.client.config.ConsumerConfig;
+import org.apache.tubemq.client.consumer.ConsumePosition;
+import org.apache.tubemq.client.consumer.ConsumerResult;
+import org.apache.tubemq.client.consumer.MessageConsumer;
+import org.apache.tubemq.client.consumer.MessageV2Listener;
+import org.apache.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.tubemq.client.consumer.PushMessageConsumer;
+import org.apache.tubemq.client.exception.TubeClientException;
+import org.apache.tubemq.client.factory.MessageSessionFactory;
+import org.apache.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.TErrCodeConstants;
+import org.apache.tubemq.corebase.utils.MixedUtils;
+import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.server.common.fielddef.CliArgDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * This class is use to process CLI Consumer process.
+ *
+ *
+ */
+public class CliConsumer extends CliAbstractBase {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(CliConsumer.class);
+ // statistic data index
+ private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
+ // sent data content
+ private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
+ private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
+ private final Map<MessageConsumer, TupleValue> consumerMap = new HashMap<>();
+ // cli parameters
+ private String masterServers;
+ private String groupName = "test_consume";
+ private ConsumePosition consumePos =
+ ConsumePosition.CONSUMER_FROM_LATEST_OFFSET;
+ private long msgCount = TBaseConstants.META_VALUE_UNDEFINED;
+ private long rpcTimeoutMs = TBaseConstants.META_VALUE_UNDEFINED;
+ private boolean reuseConn = false;
+ private int clientCount = 1;
+ private int fetchThreadCnt =
+ Runtime.getRuntime().availableProcessors();
+ private long printIntervalMs = 5000;
+ private boolean isPushConsume = false;
+
+ private boolean isStarted = false;
+
+
+ public CliConsumer() {
+ super("tubemq-consumer-test.sh");
+ initCommandOptions();
+ }
+
+ /**
+ * Init command options
+ */
+ protected void initCommandOptions() {
+ // add the cli required parameters
+ addCommandOption(CliArgDef.MASTERSERVER);
+ addCommandOption(CliArgDef.MESSAGES);
+ addCommandOption(CliArgDef.CNSTOPIC);
+ addCommandOption(CliArgDef.RPCTIMEOUT);
+ addCommandOption(CliArgDef.GROUP);
+ addCommandOption(CliArgDef.CONNREUSE);
+ addCommandOption(CliArgDef.PUSHCONSUME);
+ addCommandOption(CliArgDef.CONSUMEPOS);
+ addCommandOption(CliArgDef.FETCHTHREADS);
+ addCommandOption(CliArgDef.CLIENTCOUNT);
+ addCommandOption(CliArgDef.OUTPUTINTERVAL);
+ addCommandOption(CliArgDef.WITHOUTDELAY);
+ }
+
+ public boolean parseParams(String[] args) throws Exception {
+ // parse parameters and check value
+ CommandLine cli = parser.parse(options, args);
+ if (cli == null) {
+ throw new ParseException("Parse args failure");
+ }
+ if (cli.hasOption(CliArgDef.VERSION.longOpt)) {
+ version();
+ }
+ if (cli.hasOption(CliArgDef.HELP.longOpt)) {
+ help();
+ }
+ masterServers = cli.getOptionValue(CliArgDef.MASTERSERVER.longOpt);
+ if (TStringUtils.isBlank(masterServers)) {
+ throw new Exception(CliArgDef.MASTERSERVER.longOpt + " is required!");
+ }
+ String topicStr = cli.getOptionValue(CliArgDef.CNSTOPIC.longOpt);
+ if (TStringUtils.isBlank(topicStr)) {
+ throw new Exception(CliArgDef.CNSTOPIC.longOpt + " is required!");
+ }
+ topicAndFiltersMap.putAll(MixedUtils.parseTopicParam(topicStr));
+ if (topicAndFiltersMap.isEmpty()) {
+ throw new Exception("Invalid " + CliArgDef.CNSTOPIC.longOpt + " parameter value!");
+ }
+ String msgCntStr = cli.getOptionValue(CliArgDef.MESSAGES.longOpt);
+ if (TStringUtils.isNotBlank(msgCntStr)) {
+ msgCount = Long.parseLong(msgCntStr);
+ }
+ String groupNameStr = cli.getOptionValue(CliArgDef.GROUP.longOpt);
+ if (TStringUtils.isNotBlank(groupNameStr)) {
+ groupName = cli.getOptionValue(CliArgDef.GROUP.longOpt);
+ }
+ String reuseConnStr = cli.getOptionValue(CliArgDef.CONNREUSE.longOpt);
+ if (TStringUtils.isNotBlank(reuseConnStr)) {
+ reuseConn = Boolean.parseBoolean(reuseConnStr);
+ }
+ String rpcTimeoutStr = cli.getOptionValue(CliArgDef.RPCTIMEOUT.longOpt);
+ if (TStringUtils.isNotBlank(rpcTimeoutStr)) {
+ rpcTimeoutMs = Long.parseLong(rpcTimeoutStr);
+ }
+ String clientCntStr = cli.getOptionValue(CliArgDef.CLIENTCOUNT.longOpt);
+ if (TStringUtils.isNotBlank(clientCntStr)) {
+ clientCount = Integer.parseInt(clientCntStr);
+ }
+ String printIntMsStr = cli.getOptionValue(CliArgDef.OUTPUTINTERVAL.longOpt);
+ if (TStringUtils.isNotBlank(printIntMsStr)) {
+ printIntervalMs = Long.parseLong(printIntMsStr);
+ if (printIntervalMs < 5000) {
+ throw new Exception("Invalid "
+ + CliArgDef.OUTPUTINTERVAL.longOpt
+ + " parameter value!");
+ }
+ }
+ String consumePosStr = cli.getOptionValue(CliArgDef.CONSUMEPOS.longOpt);
+ if (TStringUtils.isNotBlank(consumePosStr)) {
+ int tmpPosId = Integer.parseInt(consumePosStr);
+ if (tmpPosId > 0) {
+ consumePos = ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS;
+ } else if (tmpPosId < 0) {
+ consumePos = ConsumePosition.CONSUMER_FROM_FIRST_OFFSET;
+ } else {
+ consumePos = ConsumePosition.CONSUMER_FROM_LATEST_OFFSET;
+ }
+ }
+ if (cli.hasOption(CliArgDef.PUSHCONSUME.longOpt)) {
+ isPushConsume = true;
+ }
+ String fetchThreadCntStr = cli.getOptionValue(CliArgDef.FETCHTHREADS.longOpt);
+ if (TStringUtils.isNotBlank(fetchThreadCntStr)) {
+ int tmpFetchThreadCnt = Integer.parseInt(fetchThreadCntStr);
+ tmpFetchThreadCnt = (tmpFetchThreadCnt < 1) ? 1 : Math.min(tmpFetchThreadCnt, 100);
+ fetchThreadCnt = tmpFetchThreadCnt;
+ }
+ return true;
+ }
+
+ // initial tubemq client order by caller required
+ public void initTask() throws Exception {
+ // initial consumer configure
+ ConsumerConfig consumerConfig =
+ new ConsumerConfig(masterServers, groupName);
+ consumerConfig.setRpcTimeoutMs(rpcTimeoutMs);
+ consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
+ consumerConfig.setConsumePosition(consumePos);
+ // initial consumer object
+ if (isPushConsume) {
+ DefaultMessageListener msgListener =
+ new DefaultMessageListener();
+ if (reuseConn) {
+ // if reuse connection, need use TubeSingleSessionFactory class
+ MessageSessionFactory msgSessionFactory =
+ new TubeSingleSessionFactory(consumerConfig);
+ this.sessionFactoryList.add(msgSessionFactory);
+ for (int i = 0; i < clientCount; i++) {
+ PushMessageConsumer consumer1 =
+ msgSessionFactory.createPushConsumer(consumerConfig);
+ for (Map.Entry<String, TreeSet<String>> entry
+ : topicAndFiltersMap.entrySet()) {
+ consumer1.subscribe(entry.getKey(), entry.getValue(), msgListener);
+ }
+ consumer1.completeSubscribe();
+ consumerMap.put(consumer1, null);
+ }
+ } else {
+ for (int i = 0; i < clientCount; i++) {
+ MessageSessionFactory msgSessionFactory =
+ new TubeMultiSessionFactory(consumerConfig);
+ this.sessionFactoryList.add(msgSessionFactory);
+ PushMessageConsumer consumer1 =
+ msgSessionFactory.createPushConsumer(consumerConfig);
+ for (Map.Entry<String, TreeSet<String>> entry
+ : topicAndFiltersMap.entrySet()) {
+ consumer1.subscribe(entry.getKey(), entry.getValue(), msgListener);
+ }
+ consumer1.completeSubscribe();
+ consumerMap.put(consumer1, null);
+ }
+ }
+ } else {
+ if (reuseConn) {
+ MessageSessionFactory msgSessionFactory =
+ new TubeSingleSessionFactory(consumerConfig);
+ this.sessionFactoryList.add(msgSessionFactory);
+ for (int i = 0; i < clientCount; i++) {
+ PullMessageConsumer consumer2 =
+ msgSessionFactory.createPullConsumer(consumerConfig);
+ for (Map.Entry<String, TreeSet<String>> entry
+ : topicAndFiltersMap.entrySet()) {
+ consumer2.subscribe(entry.getKey(), entry.getValue());
+ }
+ consumer2.completeSubscribe();
+ consumerMap.put(consumer2,
+ new TupleValue(consumer2, msgCount, fetchThreadCnt));
+ }
+ } else {
+ for (int i = 0; i < clientCount; i++) {
+ MessageSessionFactory msgSessionFactory =
+ new TubeMultiSessionFactory(consumerConfig);
+ this.sessionFactoryList.add(msgSessionFactory);
+ PullMessageConsumer consumer2 =
+ msgSessionFactory.createPullConsumer(consumerConfig);
+ for (Map.Entry<String, TreeSet<String>> entry
+ : topicAndFiltersMap.entrySet()) {
+ consumer2.subscribe(entry.getKey(), entry.getValue());
+ }
+ consumer2.completeSubscribe();
+ consumerMap.put(consumer2,
+ new TupleValue(consumer2, msgCount, fetchThreadCnt));
+ }
+ }
+ }
+ isStarted = true;
+ }
+
+ public void shutdown() throws Throwable {
+ // stop process
+ ThreadUtils.sleep(20);
+ for (MessageConsumer consumer : consumerMap.keySet()) {
+ consumer.shutdown();
+ }
+ for (MessageSessionFactory messageSessionFactory : sessionFactoryList) {
+ messageSessionFactory.shutdown();
+ }
+ }
+
+
+ private static class TupleValue {
+ public Thread[] fetchRunners = null;
+
+ public TupleValue(PullMessageConsumer consumer, long msgCount, int fetchThreadCnt) {
+ fetchRunners = new Thread[fetchThreadCnt];
+ for (int i = 0; i < fetchRunners.length; i++) {
+ fetchRunners[i] = new Thread(new FetchRequestRunner(consumer, msgCount));
+ fetchRunners[i].setName("_fetch_runner_" + i);
+ }
+ for (Thread thread : fetchRunners) {
+ thread.start();
+ }
+ }
+
+ }
+
+
+ // for push consumer callback process
+ private static class DefaultMessageListener implements MessageV2Listener {
+
+ public DefaultMessageListener() {
+ }
+
+ @Override
+ public void receiveMessages(PeerInfo peerInfo, List<Message> messages) {
+ if (messages != null && !messages.isEmpty()) {
+ TOTAL_COUNTER.addAndGet(messages.size());
+ }
+ }
+
+ @Override
+ public void receiveMessages(List<Message> messages) {
+ // deprecated
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ }
+ }
+
+ // for push consumer process
+ private static class FetchRequestRunner implements Runnable {
+
+ private final PullMessageConsumer messageConsumer;
+ private final long msgConsumeCnt;
+
+ FetchRequestRunner(PullMessageConsumer messageConsumer, long msgConsumeCnt) {
+ this.messageConsumer = messageConsumer;
+ this.msgConsumeCnt = msgConsumeCnt;
+ }
+
+ @Override
+ public void run() {
+ try {
+ do {
+ ConsumerResult result = messageConsumer.getMessage();
+ if (result.isSuccess()) {
+ List<Message> messageList = result.getMessageList();
+ if (messageList != null && !messageList.isEmpty()) {
+ TOTAL_COUNTER.addAndGet(messageList.size());
+ }
+ messageConsumer.confirmConsume(result.getConfirmContext(), true);
+ } else {
+ if (!TErrCodeConstants.IGNORE_ERROR_SET.contains(result.getErrCode())) {
+ logger.info(
+ "Receive messages errorCode is {}, Error message is {}",
+ result.getErrCode(),
+ result.getErrMsg());
+ if (messageConsumer.isShutdown()) {
+ break;
+ }
+ }
+ }
+ if (msgConsumeCnt >= 0) {
+ if (TOTAL_COUNTER.get() >= msgConsumeCnt) {
+ break;
+ }
+ }
+ } while (true);
+ } catch (TubeClientException e) {
+ logger.error("Create consumer failed!", e);
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ CliConsumer cliConsumer = new CliConsumer();
+ try {
+ boolean result = cliConsumer.parseParams(args);
+ if (!result) {
+ throw new Exception("Parse parameters failure!");
+ }
+ cliConsumer.initTask();
+ ThreadUtils.sleep(1000);
+ while (cliConsumer.msgCount < 0
+ || TOTAL_COUNTER.get() < cliConsumer.msgCount * cliConsumer.clientCount) {
+ ThreadUtils.sleep(cliConsumer.printIntervalMs);
+ System.out.println("Required received count VS received message count = "
+ + (cliConsumer.msgCount * cliConsumer.clientCount)
+ + " : " + TOTAL_COUNTER.get());
+ }
+ cliConsumer.shutdown();
+ System.out.println("Finished, received count VS received message count = "
+ + (cliConsumer.msgCount * cliConsumer.clientCount)
+ + " : " + TOTAL_COUNTER.get());
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ logger.error(ex.getMessage());
+ cliConsumer.help();
+ }
+
+ }
+
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
new file mode 100644
index 0000000..2734ac1
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.tools.cli;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
+import org.apache.tubemq.client.config.TubeClientConfig;
+import org.apache.tubemq.client.factory.MessageSessionFactory;
+import org.apache.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.tubemq.client.producer.MessageProducer;
+import org.apache.tubemq.client.producer.MessageSentCallback;
+import org.apache.tubemq.client.producer.MessageSentResult;
+import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.MixedUtils;
+import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.server.common.fielddef.CliArgDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class is use to process CLI Producer process.
+ *
+ *
+ */
+public class CliProducer extends CliAbstractBase {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(CliProducer.class);
+ // statistic data index
+ private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
+ private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
+ private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0);
+ private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0);
+ // sent data content
+ private static byte[] sentData;
+ private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
+ private final List<TupleValue> topicSendRounds = new ArrayList<>();
+ private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
+ private final Map<MessageProducer, MsgSender> producerMap = new HashMap<>();
+ // cli parameters
+ private String masterServers;
+ private long msgCount = TBaseConstants.META_VALUE_UNDEFINED;
+ private boolean useRandData = true;
+ private int msgDataSize = 1000;
+ private String payloadFilePath = null;
+ private String payloadDelim = null;
+ private long rpcTimeoutMs = TBaseConstants.META_VALUE_UNDEFINED;
+ private boolean reuseConn = false;
+ private int clientCount = 1;
+ private int sendThreadCnt = 100;
+ private long printIntervalMs = 5000;
+ private boolean syncProduction = false;
+ private boolean withoutDelay = false;
+ private boolean isStarted = false;
+ private ExecutorService sendExecutorService = null;
+
+
+ public CliProducer() {
+ super("tubemq-producer-test.sh");
+ initCommandOptions();
+ }
+
+ /**
+ * Init command options
+ */
+ protected void initCommandOptions() {
+ // add the cli required parameters
+ addCommandOption(CliArgDef.MASTERSERVER);
+ addCommandOption(CliArgDef.MESSAGES);
+ addCommandOption(CliArgDef.MSGDATASIZE);
+ //addCommandOption(CliArgDef.PAYLOADFILE);
+ //addCommandOption(CliArgDef.PAYLOADDELIM);
+ addCommandOption(CliArgDef.PRDTOPIC);
+ addCommandOption(CliArgDef.RPCTIMEOUT);
+ addCommandOption(CliArgDef.CONNREUSE);
+ addCommandOption(CliArgDef.CLIENTCOUNT);
+ addCommandOption(CliArgDef.OUTPUTINTERVAL);
+ addCommandOption(CliArgDef.SYNCPRODUCE);
+ addCommandOption(CliArgDef.SENDTHREADS);
+ addCommandOption(CliArgDef.WITHOUTDELAY);
+ }
+
+ public boolean parseParams(String[] args) throws Exception {
+ // parse parameters and check value
+ CommandLine cli = parser.parse(options, args);
+ if (cli == null) {
+ throw new ParseException("Parse args failure");
+ }
+ if (cli.hasOption(CliArgDef.VERSION.longOpt)) {
+ version();
+ }
+ if (cli.hasOption(CliArgDef.HELP.longOpt)) {
+ help();
+ }
+ masterServers = cli.getOptionValue(CliArgDef.MASTERSERVER.longOpt);
+ if (TStringUtils.isBlank(masterServers)) {
+ throw new Exception(CliArgDef.MASTERSERVER.longOpt + " is required!");
+ }
+ String topicStr = cli.getOptionValue(CliArgDef.PRDTOPIC.longOpt);
+ if (TStringUtils.isBlank(topicStr)) {
+ throw new Exception(CliArgDef.PRDTOPIC.longOpt + " is required!");
+ }
+ topicAndFiltersMap.putAll(MixedUtils.parseTopicParam(topicStr));
+ if (topicAndFiltersMap.isEmpty()) {
+ throw new Exception("Invalid " + CliArgDef.PRDTOPIC.longOpt + " parameter value!");
+ }
+ String msgCntStr = cli.getOptionValue(CliArgDef.MESSAGES.longOpt);
+ if (TStringUtils.isNotBlank(msgCntStr)) {
+ msgCount = Long.parseLong(msgCntStr);
+ }
+ String msgDataSizeStr = cli.getOptionValue(CliArgDef.MSGDATASIZE.longOpt);
+ if (TStringUtils.isNotBlank(msgDataSizeStr)) {
+ msgDataSize = Integer.parseInt(msgDataSizeStr);
+ }
+ String reuseConnStr = cli.getOptionValue(CliArgDef.CONNREUSE.longOpt);
+ if (TStringUtils.isNotBlank(reuseConnStr)) {
+ reuseConn = Boolean.parseBoolean(reuseConnStr);
+ }
+ String sendThreadCntStr = cli.getOptionValue(CliArgDef.SENDTHREADS.longOpt);
+ if (TStringUtils.isNotBlank(sendThreadCntStr)) {
+ int tmpThreadCnt = Integer.parseInt(sendThreadCntStr);
+ tmpThreadCnt = (tmpThreadCnt < 1) ? 1 : Math.min(tmpThreadCnt, 200);
+ sendThreadCnt = tmpThreadCnt;
+ }
+ String rpcTimeoutStr = cli.getOptionValue(CliArgDef.RPCTIMEOUT.longOpt);
+ if (TStringUtils.isNotBlank(rpcTimeoutStr)) {
+ rpcTimeoutMs = Long.parseLong(rpcTimeoutStr);
+ }
+ String clientCntStr = cli.getOptionValue(CliArgDef.CLIENTCOUNT.longOpt);
+ if (TStringUtils.isNotBlank(clientCntStr)) {
+ clientCount = Integer.parseInt(clientCntStr);
+ }
+ String printIntMsStr = cli.getOptionValue(CliArgDef.OUTPUTINTERVAL.longOpt);
+ if (TStringUtils.isNotBlank(printIntMsStr)) {
+ printIntervalMs = Long.parseLong(printIntMsStr);
+ if (printIntervalMs < 5000) {
+ throw new Exception("Invalid "
+ + CliArgDef.OUTPUTINTERVAL.longOpt
+ + " parameter value!");
+ }
+ }
+ if (cli.hasOption(CliArgDef.SYNCPRODUCE.longOpt)) {
+ syncProduction = true;
+ }
+ if (cli.hasOption(CliArgDef.WITHOUTDELAY.longOpt)) {
+ withoutDelay = true;
+ }
+ return true;
+ }
+ // initial tubemq client order by caller required
+ public void initTask() throws Exception {
+ // initial sent data
+ sentData = MixedUtils.buildTestData(msgDataSize);
+ // initial client configure
+ TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
+ clientConfig.setRpcTimeoutMs(rpcTimeoutMs);
+ // initial topic send round
+ for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ topicSendRounds.add(new TupleValue(entry.getKey()));
+ } else {
+ for (String filter : entry.getValue()) {
+ topicSendRounds.add(new TupleValue(entry.getKey(), filter));
+ }
+ }
+ }
+ // initial send thread service
+ sendExecutorService =
+ Executors.newFixedThreadPool(sendThreadCnt, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, "sender_" + producerMap.size());
+ }
+ });
+ // initial producer object
+ if (reuseConn) {
+ // if resue connection, use TubeSingleSessionFactory class
+ MessageSessionFactory msgSessionFactory =
+ new TubeSingleSessionFactory(clientConfig);
+ this.sessionFactoryList.add(msgSessionFactory);
+ for (int i = 0; i < clientCount; i++) {
+ MessageProducer producer = msgSessionFactory.createProducer();
+ producer.publish(topicAndFiltersMap.keySet());
+ producerMap.put(producer, new MsgSender(producer));
+ // send send task
+ sendExecutorService.submit(producerMap.get(producer));
+ }
+ } else {
+ for (int i = 0; i < clientCount; i++) {
+ // if not resue connection, use TubeMultiSessionFactory class
+ MessageSessionFactory msgSessionFactory =
+ new TubeMultiSessionFactory(clientConfig);
+ this.sessionFactoryList.add(msgSessionFactory);
+ MessageProducer producer = msgSessionFactory.createProducer();
+ producer.publish(topicAndFiltersMap.keySet());
+ producerMap.put(producer, new MsgSender(producer));
+ // send send task
+ sendExecutorService.submit(producerMap.get(producer));
+ }
+ }
+ isStarted = true;
+ }
+
+ public void shutdown() throws Throwable {
+ // stop process
+ if (sendExecutorService != null) {
+ sendExecutorService.shutdownNow();
+ }
+ ThreadUtils.sleep(20);
+ for (MessageProducer producer : producerMap.keySet()) {
+ producer.shutdown();
+ }
+ for (MessageSessionFactory messageSessionFactory : sessionFactoryList) {
+ messageSessionFactory.shutdown();
+ }
+ }
+
+ // process message send
+ public class MsgSender implements Runnable {
+
+ private final MessageProducer producer;
+
+ public MsgSender(MessageProducer producer) {
+ this.producer = producer;
+ }
+
+ @Override
+ public void run() {
+ int topicAndCondCnt = topicSendRounds.size();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
+ long sentCount = 0;
+ int roundIndex = 0;
+ while (msgCount < 0 || sentCount < msgCount) {
+ roundIndex = (int) (sentCount++ % topicAndCondCnt);
+ try {
+ long millis = System.currentTimeMillis();
+ TupleValue tupleValue = topicSendRounds.get(roundIndex);
+ Message message = new Message(tupleValue.topic, sentData);
+ if (tupleValue.filter != null) {
+ // if include filter, add filter item
+ message.putSystemHeader(tupleValue.filter, sdf.format(new Date(millis)));
+ }
+ // use sync or async process
+ if (syncProduction) {
+ MessageSentResult procResult =
+ producer.sendMessage(message);
+ TOTAL_COUNTER.incrementAndGet();
+ if (procResult.isSuccess()) {
+ SENT_SUCC_COUNTER.incrementAndGet();
+ } else {
+ SENT_FAIL_COUNTER.incrementAndGet();
+ }
+ } else {
+ producer.sendMessage(message, new DefaultSendCallback());
+ }
+ } catch (Throwable e1) {
+ TOTAL_COUNTER.incrementAndGet();
+ SENT_EXCEPT_COUNTER.incrementAndGet();
+ logger.error("sendMessage exception: ", e1);
+ }
+ // Limit sending flow control to avoid frequent errors
+ // caused by too many inflight messages being sent
+ if (!withoutDelay) {
+ if (sentCount % 5000 == 0) {
+ ThreadUtils.sleep(3000);
+ } else if (sentCount % 4000 == 0) {
+ ThreadUtils.sleep(2000);
+ } else if (sentCount % 2000 == 0) {
+ ThreadUtils.sleep(800);
+ } else if (sentCount % 1000 == 0) {
+ ThreadUtils.sleep(400);
+ }
+ }
+ }
+ // finished, close client
+ try {
+ producer.shutdown();
+ } catch (Throwable e) {
+ logger.error("producer shutdown error: ", e);
+ }
+ }
+ }
+
+ private class DefaultSendCallback implements MessageSentCallback {
+ @Override
+ public void onMessageSent(MessageSentResult result) {
+ TOTAL_COUNTER.incrementAndGet();
+ if (result.isSuccess()) {
+ SENT_SUCC_COUNTER.incrementAndGet();
+ } else {
+ SENT_FAIL_COUNTER.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ TOTAL_COUNTER.incrementAndGet();
+ SENT_EXCEPT_COUNTER.incrementAndGet();
+ logger.error("Send message error!", e);
+ }
+ }
+
+ private static class TupleValue {
+ public String topic = null;
+ public String filter = null;
+
+ public TupleValue(String topic) {
+ this.topic = topic;
+ }
+
+ public TupleValue(String topic, String filter) {
+ this.topic = topic;
+ this.filter = filter;
+ }
+
+ }
+
+ public static void main(String[] args) {
+ CliProducer cliProducer = new CliProducer();
+ try {
+ boolean result = cliProducer.parseParams(args);
+ if (!result) {
+ throw new Exception("Parse parameters failure!");
+ }
+ cliProducer.initTask();
+ ThreadUtils.sleep(1000);
+ while (cliProducer.msgCount < 0
+ || TOTAL_COUNTER.get() < cliProducer.msgCount * cliProducer.clientCount) {
+ ThreadUtils.sleep(cliProducer.printIntervalMs);
+ System.out.println("Required send count VS sent message count = "
+ + (cliProducer.msgCount * cliProducer.clientCount)
+ + " : " + TOTAL_COUNTER.get()
+ + " (" + SENT_SUCC_COUNTER.get()
+ + ":" + SENT_FAIL_COUNTER.get()
+ + ":" + SENT_EXCEPT_COUNTER.get()
+ + ")");
+ }
+ cliProducer.shutdown();
+ System.out.println("Finished, required send count VS sent message count = "
+ + (cliProducer.msgCount * cliProducer.clientCount)
+ + " : " + TOTAL_COUNTER.get()
+ + " (" + SENT_SUCC_COUNTER.get()
+ + ":" + SENT_FAIL_COUNTER.get()
+ + ":" + SENT_EXCEPT_COUNTER.get()
+ + ")");
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ logger.error(ex.getMessage());
+ cliProducer.help();
+ }
+
+ }
+
+
+}