You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:06:59 UTC

[incubator-tubemq] 01/49: [TUBEMQ-433] add tubemq perf-consumer/producer scripts (#330)

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 45f70fc0d875362af51d695d5cb5c2765d34408f
Author: Yuanbo Liu <yu...@apache.org>
AuthorDate: Thu Dec 3 19:12:41 2020 +0800

    [TUBEMQ-433] add tubemq perf-consumer/producer scripts (#330)
---
 bin/tubemq-consumer-perf-test.sh                   |  40 +++++++
 bin/tubemq-producer-perf-test.sh                   |  40 +++++++
 pom.xml                                            |   2 +-
 tubemq-example/pom.xml                             |  10 ++
 tubemq-example/src/main/assembly/assembly.xml      |   3 +
 .../apache/tubemq/example/ArgsParserHelper.java    |  48 ++++++++
 .../tubemq/example/MAMessageProducerExample.java   | 100 +++++++++++------
 .../tubemq/example/MessageConsumerExample.java     | 125 ++++++++++++++-------
 .../org/apache/tubemq/server/tools/ToolUtils.java  |   4 +-
 9 files changed, 295 insertions(+), 77 deletions(-)

diff --git a/bin/tubemq-consumer-perf-test.sh b/bin/tubemq-consumer-perf-test.sh
new file mode 100644
index 0000000..54189a2
--- /dev/null
+++ b/bin/tubemq-consumer-perf-test.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ -z "$BASE_DIR" ] ; then
+  PRG="$0"
+
+  # need this for relative symlinks
+  while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+      PRG="$link"
+    else
+      PRG="`dirname "$PRG"`/$link"
+    fi
+  done
+  BASE_DIR=`dirname "$PRG"`/..
+
+  # make it fully qualified
+  BASE_DIR=`cd "$BASE_DIR" && pwd`
+  #echo "TubeMQ master is at $BASE_DIR"
+fi
+source $BASE_DIR/bin/env.sh
+$JAVA $TOOLS_ARGS org.apache.tubemq.example.MessageConsumerExample $@
diff --git a/bin/tubemq-producer-perf-test.sh b/bin/tubemq-producer-perf-test.sh
new file mode 100644
index 0000000..ed76c3a
--- /dev/null
+++ b/bin/tubemq-producer-perf-test.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ -z "$BASE_DIR" ] ; then
+  PRG="$0"
+
+  # need this for relative symlinks
+  while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+      PRG="$link"
+    else
+      PRG="`dirname "$PRG"`/$link"
+    fi
+  done
+  BASE_DIR=`dirname "$PRG"`/..
+
+  # make it fully qualified
+  BASE_DIR=`cd "$BASE_DIR" && pwd`
+  #echo "TubeMQ master is at $BASE_DIR"
+fi
+source $BASE_DIR/bin/env.sh
+$JAVA $TOOLS_ARGS org.apache.tubemq.example.MAMessageProducerExample $@
diff --git a/pom.xml b/pom.xml
index 5a6fe4f..490fe15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -312,7 +312,7 @@
             <dependency>
                 <groupId>commons-cli</groupId>
                 <artifactId>commons-cli</artifactId>
-                <version>1.2</version>
+                <version>1.4</version>
             </dependency>
             <dependency>
                 <groupId>commons-codec</groupId>
diff --git a/tubemq-example/pom.xml b/tubemq-example/pom.xml
index 1095f7b..47aa921 100644
--- a/tubemq-example/pom.xml
+++ b/tubemq-example/pom.xml
@@ -65,6 +65,16 @@
             <groupId>org.apache.tubemq</groupId>
             <artifactId>tubemq-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/tubemq-example/src/main/assembly/assembly.xml b/tubemq-example/src/main/assembly/assembly.xml
index 11edd32..596af24 100644
--- a/tubemq-example/src/main/assembly/assembly.xml
+++ b/tubemq-example/src/main/assembly/assembly.xml
@@ -32,6 +32,9 @@
             <directory>../</directory>
             <includes>
                 <include>./conf/tools.log4j.properties</include>
+                <include>./bin/tubemq-consumer-perf-test.sh</include>
+                <include>./bin/tubemq-producer-perf-test.sh</include>
+                <include>./bin/env.sh</include>
                 <include>LICENSE</include>
                 <include>NOTICE</include>
                 <include>DISCLAIMER-WIP</include>
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java
new file mode 100644
index 0000000..c507ae4
--- /dev/null
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.example;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+
+public class ArgsParserHelper {
+
+    /**
+     * Print help information and exit.
+     *
+     * @param opts - options
+     */
+    public static void help(String commandName, Options opts) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(commandName, opts);
+        System.exit(0);
+    }
+
+    /**
+     * Init common options when parsing args.
+     * @return - options
+     */
+    public static Options initCommonOptions() {
+        Options options = new Options();
+        options.addOption(null, "help", false, "show help");
+        options.addOption(null, "master-list", true, "master address like: host1:8000,host2:8000");
+        options.addOption(null, "topic", true, "topic list, topic1,topic2 or "
+                + "topic1:tid11;tid12,topic2:tid21;tid22(consumer only)");
+        return options;
+    }
+}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
index f76b077..9c76ce1 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
@@ -30,8 +30,13 @@ import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
 import org.apache.commons.codec.binary.StringUtils;
 import org.apache.tubemq.client.config.TubeClientConfig;
 import org.apache.tubemq.client.exception.TubeClientException;
@@ -59,7 +64,7 @@ public class MAMessageProducerExample {
     private static final int SESSION_FACTORY_NUM = 10;
 
     private static Set<String> topicSet;
-    private static int msgCount;
+    private static int batchCount;
     private static int producerCount;
     private static byte[] sendData;
 
@@ -89,45 +94,72 @@ public class MAMessageProducerExample {
         }
     }
 
-    public static void main(String[] args) {
-        final String masterHostAndPort = args[0];
-
-        final String topics = args[1];
-        final List<String> topicList = Arrays.asList(topics.split(","));
-
-        topicSet = new TreeSet<>(topicList);
-
-        msgCount = Integer.parseInt(args[2]);
-        producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
-
-        logger.info("MAMessageProducerExample.main started...");
+    /**
+     * Init options
+     *
+     * @return options
+     */
+    public static Options initOptions() {
+        Options options = ArgsParserHelper.initCommonOptions();
+        options.addOption(null, "batch-size", true, "number of messages in single batch, default is 100000");
+        options.addOption(null, "max-batch", true, "max batch number, default is 1024");
+        options.addOption(null, "thread-num", true, "thread number of producers, default is 1, max is 100");
+        return options;
+    }
 
-        final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
-        final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
+    public static void main(String[] args) {
+        Options options = null;
+        try {
+            CommandLineParser parser = new DefaultParser();
+            options = initOptions();
+            CommandLine cl = parser.parse(options, args);
+            if (cl != null) {
+                final String masterHostAndPort = cl.getOptionValue("master-list");
+                final String topics = cl.getOptionValue("topic");
+                final List<String> topicList = Arrays.asList(topics.split(","));
+                topicSet = new TreeSet<>(topicList);
+
+                batchCount = Integer.parseInt(cl.getOptionValue("max-batch", "100000"));
+                int batchSize = Integer.parseInt(cl.getOptionValue("batch-size", "1024"));
+                producerCount = Math.min(Integer.parseInt(cl.getOptionValue(
+                        "thread-num", "1")), MAX_PRODUCER_NUM);
+                logger.info("MAMessageProducerExample.main started...");
+                final byte[] transmitData = StringUtils
+                        .getBytesUtf8("This is a test message from multi-session factory.");
+                final ByteBuffer dataBuffer = ByteBuffer.allocate(batchSize);
+
+                while (dataBuffer.hasRemaining()) {
+                    int offset = dataBuffer.arrayOffset();
+                    dataBuffer.put(transmitData, offset,
+                            Math.min(dataBuffer.remaining(), transmitData.length));
+                }
 
-        while (dataBuffer.hasRemaining()) {
-            int offset = dataBuffer.arrayOffset();
-            dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length));
-        }
+                dataBuffer.flip();
+                sendData = dataBuffer.array();
 
-        dataBuffer.flip();
-        sendData = dataBuffer.array();
+                try {
+                    MAMessageProducerExample messageProducer = new MAMessageProducerExample(
+                            masterHostAndPort);
 
-        try {
-            MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort);
+                    messageProducer.startService();
 
-            messageProducer.startService();
+                    while (SENT_SUCC_COUNTER.get() < (long) batchCount * producerCount * topicSet.size()) {
+                        TimeUnit.MILLISECONDS.sleep(1000);
+                    }
+                    messageProducer.producerMap.clear();
+                    messageProducer.shutdown();
 
-            while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) {
-                Thread.sleep(1000);
+                } catch (TubeClientException e) {
+                    logger.error("TubeClientException: ", e);
+                } catch (Throwable e) {
+                    logger.error("Throwable: ", e);
+                }
+            }
+        } catch (Exception ex) {
+            logger.error(ex.getMessage());
+            if (options != null) {
+                ArgsParserHelper.help("./tubemq-producer-perf-test.sh", options);
             }
-            messageProducer.producerMap.clear();
-            messageProducer.shutdown();
-
-        } catch (TubeClientException e) {
-            logger.error("TubeClientException: ", e);
-        } catch (Throwable e) {
-            logger.error("Throwable: ", e);
         }
     }
 
@@ -173,7 +205,7 @@ public class MAMessageProducerExample {
             } catch (Throwable t) {
                 logger.error("publish exception: ", t);
             }
-            for (int i = 0; i < msgCount; i++) {
+            for (int i = 0; i < batchCount; i++) {
                 long millis = System.currentTimeMillis();
                 for (String topic : topicSet) {
                     try {
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
index d9aeb8a..3b99943 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
@@ -26,6 +26,10 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
 import org.apache.tubemq.client.common.PeerInfo;
 import org.apache.tubemq.client.config.ConsumerConfig;
 import org.apache.tubemq.client.consumer.ConsumePosition;
@@ -59,29 +63,45 @@ public final class MessageConsumerExample {
     private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
 
     private final PushMessageConsumer messageConsumer;
-    private final MessageSessionFactory messageSessionFactory;
 
-    public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception {
+    public MessageConsumerExample(String masterHostAndPort, String group,
+            int fetchCount, boolean isFromBegin) throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
-        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+        if (isFromBegin) {
+            consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+        } else {
+            consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+        }
         if (fetchCount > 0) {
             consumerConfig.setPushFetchThreadCnt(fetchCount);
         }
-        this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+        MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
     }
 
-    public static void main(String[] args) {
-        final String masterHostAndPort = args[0];
-        final String topics = args[1];
-        final String group = args[2];
-        final int consumerCount = Integer.parseInt(args[3]);
-        int fetchCount = -1;
-        if (args.length > 5) {
-            fetchCount = Integer.parseInt(args[4]);
-        }
-        final Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
+    /**
+     * Init options
+     * @return options
+     */
+    public static Options initOptions() {
+
+        Options options = ArgsParserHelper.initCommonOptions();
+        options.addOption(null, "batch-size", true, "max number of fetching message in one batch");
+        options.addOption(null, "thread-num", true, "thread number of consumers");
+        options.addOption(null, "group", true, "consumer group");
+        options.addOption(null, "from-begin", false, "default is consuming from latest, "
+                + "if option is clarified, then consume from begin");
+        return options;
 
+    }
+
+    /**
+     * init topic->set(tid) map
+     * @param topics - topics string
+     * @return - map of topic->set(tid)
+     */
+    private static Map<String, TreeSet<String>> initTopicList(String topics) {
+        Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
         String[] topicTidsList = topics.split(",");
         for (String topicTids : topicTidsList) {
             String[] topicTidStr = topicTids.split(":");
@@ -95,35 +115,60 @@ public final class MessageConsumerExample {
             }
             topicTidsMap.put(topicTidStr[0], tids);
         }
-        final int startFetchCount = fetchCount;
-        final ExecutorService executorService = Executors.newFixedThreadPool(fetchCount);
-        for (int i = 0; i < consumerCount; i++) {
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        MessageConsumerExample messageConsumer = new MessageConsumerExample(
-                                masterHostAndPort,
-                                group,
-                                startFetchCount
-                        );
-                        messageConsumer.subscribe(topicTidsMap);
-                    } catch (Exception e) {
-                        logger.error("Create consumer failed!", e);
-                    }
-                }
-            });
-        }
-        final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread");
-        statisticThread.start();
+        return topicTidsMap;
+    }
 
-        executorService.shutdown();
+    public static void main(String[] args) {
+        Options options = null;
         try {
-            executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            logger.error("Thread Pool shutdown has been interrupted!");
+            CommandLineParser parser = new DefaultParser();
+            options = initOptions();
+            CommandLine cl = parser.parse(options, args);
+            if (cl != null) {
+                final String masterHostAndPort = cl.getOptionValue("master-list");
+                final Map<String, TreeSet<String>> topicTidsMap = initTopicList(
+                        cl.getOptionValue("topic"));
+                final String group = cl.getOptionValue("group");
+                int threadNum = Integer.parseInt(cl.getOptionValue("thread-num", "1"));
+                final int fetchCount = Integer.parseInt(cl.getOptionValue("batch-size", "-1"));
+                final boolean isFromBegin = cl.hasOption("from-begin");
+                ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
+                for (int i = 0; i < threadNum; i++) {
+                    executorService.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                MessageConsumerExample messageConsumer = new MessageConsumerExample(
+                                        masterHostAndPort,
+                                        group,
+                                        fetchCount,
+                                        isFromBegin
+                                );
+                                messageConsumer.subscribe(topicTidsMap);
+                            } catch (Exception e) {
+                                logger.error("Create consumer failed!", e);
+                            }
+                        }
+                    });
+                }
+                final Thread statisticThread = new Thread(msgRecvStats,
+                        "Received Statistic Thread");
+                statisticThread.start();
+
+                executorService.shutdown();
+                try {
+                    executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    logger.error("Thread Pool shutdown has been interrupted!");
+                }
+                msgRecvStats.stopStats();
+            }
+        } catch (Exception ex) {
+            logger.error(ex.getMessage(), ex.getMessage());
+            if (options != null) {
+                ArgsParserHelper.help("./tubemq-consumer-perf-test.sh", options);
+            }
         }
-        msgRecvStats.stopStats();
     }
 
     public void subscribe(Map<String, TreeSet<String>> topicTidsMap) throws TubeClientException {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java
index d2c503a..9c343a8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java
@@ -19,10 +19,10 @@ package org.apache.tubemq.server.tools;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.broker.BrokerConfig;
 import org.apache.tubemq.server.broker.exception.StartupException;
@@ -56,7 +56,7 @@ public class ToolUtils {
         final Options options = new Options();
         final Option file = new Option("f", true, "configuration file path");
         options.addOption(file);
-        final CommandLineParser parser = new PosixParser();
+        final CommandLineParser parser = new DefaultParser();
         CommandLine line = null;
         try {
             line = parser.parse(options, args);