You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2020/10/20 00:39:56 UTC

[rocketmq] branch develop updated: [ISSUE #2146] Add benchmark shutdown script, add more print info, add consumer threand count command option (#2150)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3be7033  [ISSUE #2146] Add benchmark shutdown script, add more print info, add consumer threand count command option (#2150)
3be7033 is described below

commit 3be7033a16dba17153dd3c58fec7f43ae04c25e0
Author: 张旭 <ma...@gmail.com>
AuthorDate: Tue Oct 20 08:39:36 2020 +0800

    [ISSUE #2146] Add benchmark shutdown script, add more print info, add consumer threand count command option (#2150)
    
    Co-authored-by: zhangxu16 <zh...@xiaomi.com>
---
 distribution/benchmark/shutdown.sh                 | 63 ++++++++++++++++++++++
 .../rocketmq/example/benchmark/Consumer.java       | 15 ++++--
 .../rocketmq/example/benchmark/Producer.java       |  4 +-
 .../example/benchmark/TransactionProducer.java     |  4 +-
 4 files changed, 78 insertions(+), 8 deletions(-)

diff --git a/distribution/benchmark/shutdown.sh b/distribution/benchmark/shutdown.sh
new file mode 100644
index 0000000..9ecd326
--- /dev/null
+++ b/distribution/benchmark/shutdown.sh
@@ -0,0 +1,63 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+case $1 in
+    producer)
+
+    pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.Producer' |grep java | grep -v grep | awk '{print $1}'`
+    if [ -z "$pid" ] ; then
+            echo "No benchmark producer running."
+            exit -1;
+    fi
+
+    echo "The benchmkar producer(${pid}) is running..."
+
+    kill ${pid}
+
+    echo "Send shutdown request to benchmark producer(${pid}) OK"
+    ;;
+    consumer)
+
+    pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.Consumer' |grep java | grep -v grep | awk '{print $1}'`
+    if [ -z "$pid" ] ; then
+            echo "No benchmark consumer running."
+            exit -1;
+    fi
+
+    echo "The benchmark consumer(${pid}) is running..."
+
+    kill ${pid}
+
+    echo "Send shutdown request to benchmark consumer(${pid}) OK"
+    ;;
+    tproducer)
+
+    pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.TransactionProducer' |grep java | grep -v grep | awk '{print $1}'`
+    if [ -z "$pid" ] ; then
+            echo "No benchmark transaction producer running."
+            exit -1;
+    fi
+
+    echo "The benchmkar transaction producer(${pid}) is running..."
+
+    kill ${pid}
+
+    echo "Send shutdown request to benchmark transaction producer(${pid}) OK"
+    ;;
+    *)
+    echo "Useage: shutdown producer | consumer | tproducer"
+esac
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index c0a2a8b..b6c6ae4 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -52,6 +52,7 @@ public class Consumer {
         }
 
         final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
+        final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 20;
         final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
         final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
         final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null;
@@ -65,8 +66,8 @@ public class Consumer {
             group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
         }
 
-        System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
-            topic, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
+        System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
+            topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
 
         final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
 
@@ -101,8 +102,8 @@ public class Consumer {
                     statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
                     statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);
 
-                    System.out.printf("TPS: %d FAIL: %d AVG(B2C) RT: %7.3f AVG(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
-                            consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
+                    System.out.printf("Current Time: %s TPS: %d FAIL: %d AVG(B2C) RT(ms): %7.3f AVG(S2C) RT(ms): %7.3f MAX(B2C) RT(ms): %d MAX(S2C) RT(ms): %d%n",
+                            System.currentTimeMillis(), consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
                     );
                 }
             }
@@ -123,6 +124,8 @@ public class Consumer {
             String ns = commandLine.getOptionValue('n');
             consumer.setNamesrvAddr(ns);
         }
+        consumer.setConsumeThreadMin(threadCount);
+        consumer.setConsumeThreadMax(threadCount);
         consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
 
         if (filterType == null || expression == null) {
@@ -179,6 +182,10 @@ public class Consumer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("w", "threadCount", true, "Thread count, Default: 20");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         opt = new Option("g", "group", true, "Consumer group name, Default: benchmark_consumer");
         opt.setRequired(false);
         options.addOption(opt);
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index dbad169..93271cb 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -89,8 +89,8 @@ public class Producer {
                     final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
                     final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
 
-                    System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n",
-                        sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
+                    System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
+                        System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
                 }
             }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 951b718..85af04e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -105,8 +105,8 @@ public class TransactionProducer {
                     final long dupCheck = end.duplicatedCheck - begin.duplicatedCheck;
 
                     System.out.printf(
-                        "Send TPS:%5d Max RT:%5d AVG RT:%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n",
-                            sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
+                        "Current Time: %s Send TPS:%5d Max RT(ms):%5d AVG RT(ms):%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n",
+                            System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
                             unexpectedCheck, dupCheck);
                     statsBenchmark.getSendMessageMaxRT().set(0);
                 }