You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:06:59 UTC
[incubator-tubemq] 01/49: [TUBEMQ-433] add tubemq
perf-consumer/producer scripts (#330)
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 45f70fc0d875362af51d695d5cb5c2765d34408f
Author: Yuanbo Liu <yu...@apache.org>
AuthorDate: Thu Dec 3 19:12:41 2020 +0800
[TUBEMQ-433] add tubemq perf-consumer/producer scripts (#330)
---
bin/tubemq-consumer-perf-test.sh | 40 +++++++
bin/tubemq-producer-perf-test.sh | 40 +++++++
pom.xml | 2 +-
tubemq-example/pom.xml | 10 ++
tubemq-example/src/main/assembly/assembly.xml | 3 +
.../apache/tubemq/example/ArgsParserHelper.java | 48 ++++++++
.../tubemq/example/MAMessageProducerExample.java | 100 +++++++++++------
.../tubemq/example/MessageConsumerExample.java | 125 ++++++++++++++-------
.../org/apache/tubemq/server/tools/ToolUtils.java | 4 +-
9 files changed, 295 insertions(+), 77 deletions(-)
diff --git a/bin/tubemq-consumer-perf-test.sh b/bin/tubemq-consumer-perf-test.sh
new file mode 100644
index 0000000..54189a2
--- /dev/null
+++ b/bin/tubemq-consumer-perf-test.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ -z "$BASE_DIR" ] ; then
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+ BASE_DIR=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ BASE_DIR=`cd "$BASE_DIR" && pwd`
+ #echo "TubeMQ master is at $BASE_DIR"
+fi
+source $BASE_DIR/bin/env.sh
+$JAVA $TOOLS_ARGS org.apache.tubemq.example.MessageConsumerExample $@
diff --git a/bin/tubemq-producer-perf-test.sh b/bin/tubemq-producer-perf-test.sh
new file mode 100644
index 0000000..ed76c3a
--- /dev/null
+++ b/bin/tubemq-producer-perf-test.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ -z "$BASE_DIR" ] ; then
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+ BASE_DIR=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ BASE_DIR=`cd "$BASE_DIR" && pwd`
+ #echo "TubeMQ master is at $BASE_DIR"
+fi
+source $BASE_DIR/bin/env.sh
+$JAVA $TOOLS_ARGS org.apache.tubemq.example.MAMessageProducerExample $@
diff --git a/pom.xml b/pom.xml
index 5a6fe4f..490fe15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -312,7 +312,7 @@
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
- <version>1.2</version>
+ <version>1.4</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
diff --git a/tubemq-example/pom.xml b/tubemq-example/pom.xml
index 1095f7b..47aa921 100644
--- a/tubemq-example/pom.xml
+++ b/tubemq-example/pom.xml
@@ -65,6 +65,16 @@
<groupId>org.apache.tubemq</groupId>
<artifactId>tubemq-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/tubemq-example/src/main/assembly/assembly.xml b/tubemq-example/src/main/assembly/assembly.xml
index 11edd32..596af24 100644
--- a/tubemq-example/src/main/assembly/assembly.xml
+++ b/tubemq-example/src/main/assembly/assembly.xml
@@ -32,6 +32,9 @@
<directory>../</directory>
<includes>
<include>./conf/tools.log4j.properties</include>
+ <include>./bin/tubemq-consumer-perf-test.sh</include>
+ <include>./bin/tubemq-producer-perf-test.sh</include>
+ <include>./bin/env.sh</include>
<include>LICENSE</include>
<include>NOTICE</include>
<include>DISCLAIMER-WIP</include>
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java
new file mode 100644
index 0000000..c507ae4
--- /dev/null
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.example;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+
+public class ArgsParserHelper {
+
+ /**
+ * Print help information and exit.
+ *
+ * @param opts - options
+ */
+ public static void help(String commandName, Options opts) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(commandName, opts);
+ System.exit(0);
+ }
+
+ /**
+ * Init common options when parsing args.
+ * @return - options
+ */
+ public static Options initCommonOptions() {
+ Options options = new Options();
+ options.addOption(null, "help", false, "show help");
+ options.addOption(null, "master-list", true, "master address like: host1:8000,host2:8000");
+ options.addOption(null, "topic", true, "topic list, topic1,topic2 or "
+ + "topic1:tid11;tid12,topic2:tid21;tid22(consumer only)");
+ return options;
+ }
+}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
index f76b077..9c76ce1 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
@@ -30,8 +30,13 @@ import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.exception.TubeClientException;
@@ -59,7 +64,7 @@ public class MAMessageProducerExample {
private static final int SESSION_FACTORY_NUM = 10;
private static Set<String> topicSet;
- private static int msgCount;
+ private static int batchCount;
private static int producerCount;
private static byte[] sendData;
@@ -89,45 +94,72 @@ public class MAMessageProducerExample {
}
}
- public static void main(String[] args) {
- final String masterHostAndPort = args[0];
-
- final String topics = args[1];
- final List<String> topicList = Arrays.asList(topics.split(","));
-
- topicSet = new TreeSet<>(topicList);
-
- msgCount = Integer.parseInt(args[2]);
- producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
-
- logger.info("MAMessageProducerExample.main started...");
+ /**
+ * Init options
+ *
+ * @return options
+ */
+ public static Options initOptions() {
+ Options options = ArgsParserHelper.initCommonOptions();
+ options.addOption(null, "batch-size", true, "number of messages in single batch, default is 100000");
+ options.addOption(null, "max-batch", true, "max batch number, default is 1024");
+ options.addOption(null, "thread-num", true, "thread number of producers, default is 1, max is 100");
+ return options;
+ }
- final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
- final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
+ public static void main(String[] args) {
+ Options options = null;
+ try {
+ CommandLineParser parser = new DefaultParser();
+ options = initOptions();
+ CommandLine cl = parser.parse(options, args);
+ if (cl != null) {
+ final String masterHostAndPort = cl.getOptionValue("master-list");
+ final String topics = cl.getOptionValue("topic");
+ final List<String> topicList = Arrays.asList(topics.split(","));
+ topicSet = new TreeSet<>(topicList);
+
+ batchCount = Integer.parseInt(cl.getOptionValue("max-batch", "100000"));
+ int batchSize = Integer.parseInt(cl.getOptionValue("batch-size", "1024"));
+ producerCount = Math.min(Integer.parseInt(cl.getOptionValue(
+ "thread-num", "1")), MAX_PRODUCER_NUM);
+ logger.info("MAMessageProducerExample.main started...");
+ final byte[] transmitData = StringUtils
+ .getBytesUtf8("This is a test message from multi-session factory.");
+ final ByteBuffer dataBuffer = ByteBuffer.allocate(batchSize);
+
+ while (dataBuffer.hasRemaining()) {
+ int offset = dataBuffer.arrayOffset();
+ dataBuffer.put(transmitData, offset,
+ Math.min(dataBuffer.remaining(), transmitData.length));
+ }
- while (dataBuffer.hasRemaining()) {
- int offset = dataBuffer.arrayOffset();
- dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length));
- }
+ dataBuffer.flip();
+ sendData = dataBuffer.array();
- dataBuffer.flip();
- sendData = dataBuffer.array();
+ try {
+ MAMessageProducerExample messageProducer = new MAMessageProducerExample(
+ masterHostAndPort);
- try {
- MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort);
+ messageProducer.startService();
- messageProducer.startService();
+ while (SENT_SUCC_COUNTER.get() < (long) batchCount * producerCount * topicSet.size()) {
+ TimeUnit.MILLISECONDS.sleep(1000);
+ }
+ messageProducer.producerMap.clear();
+ messageProducer.shutdown();
- while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) {
- Thread.sleep(1000);
+ } catch (TubeClientException e) {
+ logger.error("TubeClientException: ", e);
+ } catch (Throwable e) {
+ logger.error("Throwable: ", e);
+ }
+ }
+ } catch (Exception ex) {
+ logger.error(ex.getMessage());
+ if (options != null) {
+ ArgsParserHelper.help("./tubemq-producer-perf-test.sh", options);
}
- messageProducer.producerMap.clear();
- messageProducer.shutdown();
-
- } catch (TubeClientException e) {
- logger.error("TubeClientException: ", e);
- } catch (Throwable e) {
- logger.error("Throwable: ", e);
}
}
@@ -173,7 +205,7 @@ public class MAMessageProducerExample {
} catch (Throwable t) {
logger.error("publish exception: ", t);
}
- for (int i = 0; i < msgCount; i++) {
+ for (int i = 0; i < batchCount; i++) {
long millis = System.currentTimeMillis();
for (String topic : topicSet) {
try {
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
index d9aeb8a..3b99943 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
@@ -26,6 +26,10 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
import org.apache.tubemq.client.common.PeerInfo;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
@@ -59,29 +63,45 @@ public final class MessageConsumerExample {
private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
private final PushMessageConsumer messageConsumer;
- private final MessageSessionFactory messageSessionFactory;
- public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception {
+ public MessageConsumerExample(String masterHostAndPort, String group,
+ int fetchCount, boolean isFromBegin) throws Exception {
ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
- consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ if (isFromBegin) {
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+ } else {
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ }
if (fetchCount > 0) {
consumerConfig.setPushFetchThreadCnt(fetchCount);
}
- this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
}
- public static void main(String[] args) {
- final String masterHostAndPort = args[0];
- final String topics = args[1];
- final String group = args[2];
- final int consumerCount = Integer.parseInt(args[3]);
- int fetchCount = -1;
- if (args.length > 5) {
- fetchCount = Integer.parseInt(args[4]);
- }
- final Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
+ /**
+ * Init options
+ * @return options
+ */
+ public static Options initOptions() {
+
+ Options options = ArgsParserHelper.initCommonOptions();
+ options.addOption(null, "batch-size", true, "max number of fetching message in one batch");
+ options.addOption(null, "thread-num", true, "thread number of consumers");
+ options.addOption(null, "group", true, "consumer group");
+ options.addOption(null, "from-begin", false, "default is consuming from latest, "
+ + "if option is clarified, then consume from begin");
+ return options;
+ }
+
+ /**
+ * init topic->set(tid) map
+ * @param topics - topics string
+ * @return - map of topic->set(tid)
+ */
+ private static Map<String, TreeSet<String>> initTopicList(String topics) {
+ Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
String[] topicTidsList = topics.split(",");
for (String topicTids : topicTidsList) {
String[] topicTidStr = topicTids.split(":");
@@ -95,35 +115,60 @@ public final class MessageConsumerExample {
}
topicTidsMap.put(topicTidStr[0], tids);
}
- final int startFetchCount = fetchCount;
- final ExecutorService executorService = Executors.newFixedThreadPool(fetchCount);
- for (int i = 0; i < consumerCount; i++) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- MessageConsumerExample messageConsumer = new MessageConsumerExample(
- masterHostAndPort,
- group,
- startFetchCount
- );
- messageConsumer.subscribe(topicTidsMap);
- } catch (Exception e) {
- logger.error("Create consumer failed!", e);
- }
- }
- });
- }
- final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread");
- statisticThread.start();
+ return topicTidsMap;
+ }
- executorService.shutdown();
+ public static void main(String[] args) {
+ Options options = null;
try {
- executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.error("Thread Pool shutdown has been interrupted!");
+ CommandLineParser parser = new DefaultParser();
+ options = initOptions();
+ CommandLine cl = parser.parse(options, args);
+ if (cl != null) {
+ final String masterHostAndPort = cl.getOptionValue("master-list");
+ final Map<String, TreeSet<String>> topicTidsMap = initTopicList(
+ cl.getOptionValue("topic"));
+ final String group = cl.getOptionValue("group");
+ int threadNum = Integer.parseInt(cl.getOptionValue("thread-num", "1"));
+ final int fetchCount = Integer.parseInt(cl.getOptionValue("batch-size", "-1"));
+ final boolean isFromBegin = cl.hasOption("from-begin");
+ ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
+ for (int i = 0; i < threadNum; i++) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ MessageConsumerExample messageConsumer = new MessageConsumerExample(
+ masterHostAndPort,
+ group,
+ fetchCount,
+ isFromBegin
+ );
+ messageConsumer.subscribe(topicTidsMap);
+ } catch (Exception e) {
+ logger.error("Create consumer failed!", e);
+ }
+ }
+ });
+ }
+ final Thread statisticThread = new Thread(msgRecvStats,
+ "Received Statistic Thread");
+ statisticThread.start();
+
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.error("Thread Pool shutdown has been interrupted!");
+ }
+ msgRecvStats.stopStats();
+ }
+ } catch (Exception ex) {
+ logger.error(ex.getMessage(), ex.getMessage());
+ if (options != null) {
+ ArgsParserHelper.help("./tubemq-consumer-perf-test.sh", options);
+ }
}
- msgRecvStats.stopStats();
}
public void subscribe(Map<String, TreeSet<String>> topicTidsMap) throws TubeClientException {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java
index d2c503a..9c343a8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java
@@ -19,10 +19,10 @@ package org.apache.tubemq.server.tools;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.exception.StartupException;
@@ -56,7 +56,7 @@ public class ToolUtils {
final Options options = new Options();
final Option file = new Option("f", true, "configuration file path");
options.addOption(file);
- final CommandLineParser parser = new PosixParser();
+ final CommandLineParser parser = new DefaultParser();
CommandLine line = null;
try {
line = parser.parse(options, args);