You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2020/10/20 00:39:56 UTC
[rocketmq] branch develop updated: [ISSUE #2146] Add benchmark
shutdown script, add more print info,
add consumer threand count command option (#2150)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3be7033 [ISSUE #2146] Add benchmark shutdown script, add more print info, add consumer threand count command option (#2150)
3be7033 is described below
commit 3be7033a16dba17153dd3c58fec7f43ae04c25e0
Author: 张旭 <ma...@gmail.com>
AuthorDate: Tue Oct 20 08:39:36 2020 +0800
[ISSUE #2146] Add benchmark shutdown script, add more print info, add consumer threand count command option (#2150)
Co-authored-by: zhangxu16 <zh...@xiaomi.com>
---
distribution/benchmark/shutdown.sh | 63 ++++++++++++++++++++++
.../rocketmq/example/benchmark/Consumer.java | 15 ++++--
.../rocketmq/example/benchmark/Producer.java | 4 +-
.../example/benchmark/TransactionProducer.java | 4 +-
4 files changed, 78 insertions(+), 8 deletions(-)
diff --git a/distribution/benchmark/shutdown.sh b/distribution/benchmark/shutdown.sh
new file mode 100644
index 0000000..9ecd326
--- /dev/null
+++ b/distribution/benchmark/shutdown.sh
@@ -0,0 +1,63 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+case $1 in
+ producer)
+
+ pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.Producer' |grep java | grep -v grep | awk '{print $1}'`
+ if [ -z "$pid" ] ; then
+ echo "No benchmark producer running."
+ exit -1;
+ fi
+
+ echo "The benchmkar producer(${pid}) is running..."
+
+ kill ${pid}
+
+ echo "Send shutdown request to benchmark producer(${pid}) OK"
+ ;;
+ consumer)
+
+ pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.Consumer' |grep java | grep -v grep | awk '{print $1}'`
+ if [ -z "$pid" ] ; then
+ echo "No benchmark consumer running."
+ exit -1;
+ fi
+
+ echo "The benchmark consumer(${pid}) is running..."
+
+ kill ${pid}
+
+ echo "Send shutdown request to benchmark consumer(${pid}) OK"
+ ;;
+ tproducer)
+
+ pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.TransactionProducer' |grep java | grep -v grep | awk '{print $1}'`
+ if [ -z "$pid" ] ; then
+ echo "No benchmark transaction producer running."
+ exit -1;
+ fi
+
+ echo "The benchmkar transaction producer(${pid}) is running..."
+
+ kill ${pid}
+
+ echo "Send shutdown request to benchmark transaction producer(${pid}) OK"
+ ;;
+ *)
+ echo "Useage: shutdown producer | consumer | tproducer"
+esac
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index c0a2a8b..b6c6ae4 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -52,6 +52,7 @@ public class Consumer {
}
final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
+ final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 20;
final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null;
@@ -65,8 +66,8 @@ public class Consumer {
group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
}
- System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
- topic, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
+ System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
+ topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
@@ -101,8 +102,8 @@ public class Consumer {
statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);
- System.out.printf("TPS: %d FAIL: %d AVG(B2C) RT: %7.3f AVG(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
- consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
+ System.out.printf("Current Time: %s TPS: %d FAIL: %d AVG(B2C) RT(ms): %7.3f AVG(S2C) RT(ms): %7.3f MAX(B2C) RT(ms): %d MAX(S2C) RT(ms): %d%n",
+ System.currentTimeMillis(), consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
);
}
}
@@ -123,6 +124,8 @@ public class Consumer {
String ns = commandLine.getOptionValue('n');
consumer.setNamesrvAddr(ns);
}
+ consumer.setConsumeThreadMin(threadCount);
+ consumer.setConsumeThreadMax(threadCount);
consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
if (filterType == null || expression == null) {
@@ -179,6 +182,10 @@ public class Consumer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("w", "threadCount", true, "Thread count, Default: 20");
+ opt.setRequired(false);
+ options.addOption(opt);
+
opt = new Option("g", "group", true, "Consumer group name, Default: benchmark_consumer");
opt.setRequired(false);
options.addOption(opt);
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index dbad169..93271cb 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -89,8 +89,8 @@ public class Producer {
final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
- System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n",
- sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
+ System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
+ System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 951b718..85af04e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -105,8 +105,8 @@ public class TransactionProducer {
final long dupCheck = end.duplicatedCheck - begin.duplicatedCheck;
System.out.printf(
- "Send TPS:%5d Max RT:%5d AVG RT:%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n",
- sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
+ "Current Time: %s Send TPS:%5d Max RT(ms):%5d AVG RT(ms):%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n",
+ System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
unexpectedCheck, dupCheck);
statsBenchmark.getSendMessageMaxRT().set(0);
}