You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/27 02:38:41 UTC
[rocketmq] branch develop updated: [ISSUE #6803] Benchmark support reportInterval option (#6804)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e545e3b359 [ISSUE #6803] Benchmark support reportInterval option (#6804)
e545e3b359 is described below
commit e545e3b359d4808537ff2f1f5bb45c04879aec86
Author: DL1231 <53...@users.noreply.github.com>
AuthorDate: Sat May 27 10:38:17 2023 +0800
[ISSUE #6803] Benchmark support reportInterval option (#6804)
---
.../rocketmq/example/benchmark/BatchProducer.java | 19 +++++++++++++++----
.../apache/rocketmq/example/benchmark/Consumer.java | 11 ++++++++---
.../apache/rocketmq/example/benchmark/Producer.java | 11 ++++++++---
.../example/benchmark/TransactionProducer.java | 11 ++++++++---
4 files changed, 39 insertions(+), 13 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index 8771339b76..c4a6162a5f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -74,10 +74,11 @@ public class BatchProducer {
final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
final boolean aclEnable = getOptionValue(commandLine, 'a', false);
final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
+ final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, " +
- "aclEnable: %s%n compressEnable: %s%n",
- topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress);
+ "aclEnable: %s%n compressEnable: %s, reportInterval: %d%n",
+ topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress, reportInterval);
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
@@ -85,7 +86,7 @@ public class BatchProducer {
}
msgBody = sb.toString().getBytes(StandardCharsets.UTF_8);
- final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer();
+ final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer(reportInterval);
statsBenchmark.start();
RPCHook rpcHook = null;
@@ -253,6 +254,10 @@ public class BatchProducer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -359,6 +364,12 @@ class StatsBenchmarkBatchProducer {
private final LinkedList<Long[]> snapshotList = new LinkedList<>();
+ private final int reportInterval;
+
+ public StatsBenchmarkBatchProducer(int reportInterval) {
+ this.reportInterval = reportInterval;
+ }
+
public Long[] createSnapshot() {
Long[] snap = new Long[] {
System.currentTimeMillis(),
@@ -432,7 +443,7 @@ class StatsBenchmarkBatchProducer {
e.printStackTrace();
}
}
- }, 10000, 10000, TimeUnit.MILLISECONDS);
+ }, reportInterval, reportInterval, TimeUnit.MILLISECONDS);
}
public void shutdown() {
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 87388edc9a..57270fcd00 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -68,14 +68,15 @@ public class Consumer {
final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
final boolean clientRebalanceEnable = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c')) : true;
+ final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
String group = groupPrefix;
if (Boolean.parseBoolean(isSuffixEnable)) {
group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
}
- System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
- topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
+ System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s, reportInterval: %d%n",
+ topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable, reportInterval);
final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
@@ -124,7 +125,7 @@ public class Consumer {
e.printStackTrace();
}
}
- }, 10000, 10000, TimeUnit.MILLISECONDS);
+ }, reportInterval, reportInterval, TimeUnit.MILLISECONDS);
RPCHook rpcHook = null;
if (aclEnable) {
@@ -235,6 +236,10 @@ public class Consumer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index ab474fcf4f..480d16b758 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -82,12 +82,13 @@ public class Producer {
final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y'));
final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
+ final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
System.out.printf("topic: %s, threadCount: %d, messageSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, " +
"traceEnable: %s, aclEnable: %s, messageQuantity: %d, delayEnable: %s, delayLevel: %s, " +
- "asyncEnable: %s%n compressEnable: %s%n",
+ "asyncEnable: %s%n compressEnable: %s, reportInterval: %d%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum,
- delayEnable, delayLevel, asyncEnable, enableCompress);
+ delayEnable, delayLevel, asyncEnable, enableCompress, reportInterval);
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
@@ -139,7 +140,7 @@ public class Producer {
e.printStackTrace();
}
}
- }, 10000, 10000, TimeUnit.MILLISECONDS);
+ }, reportInterval, reportInterval, TimeUnit.MILLISECONDS);
RPCHook rpcHook = null;
if (aclEnable) {
@@ -370,6 +371,10 @@ public class Producer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index ebe3e01fdc..34cdeb49db 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -79,6 +79,7 @@ public class TransactionProducer {
config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0;
config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
config.msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
+ config.reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount);
@@ -105,8 +106,7 @@ public class TransactionProducer {
Snapshot begin = snapshotList.getFirst();
Snapshot end = snapshotList.getLast();
- final long sendCount = (end.sendRequestSuccessCount - begin.sendRequestSuccessCount)
- + (end.sendRequestFailedCount - begin.sendRequestFailedCount);
+ final long sendCount = end.sendRequestSuccessCount - begin.sendRequestSuccessCount;
final long sendTps = (sendCount * 1000L) / (end.endTime - begin.endTime);
final double averageRT = (end.sendMessageTimeTotal - begin.sendMessageTimeTotal) / (double) (end.sendRequestSuccessCount - begin.sendRequestSuccessCount);
@@ -131,7 +131,7 @@ public class TransactionProducer {
e.printStackTrace();
}
}
- }, 10000, 10000, TimeUnit.MILLISECONDS);
+ }, config.reportInterval, config.reportInterval, TimeUnit.MILLISECONDS);
RPCHook rpcHook = null;
if (config.aclEnable) {
@@ -291,6 +291,10 @@ public class TransactionProducer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
}
@@ -475,6 +479,7 @@ class TxSendConfig {
int sendInterval;
boolean aclEnable;
boolean msgTraceEnable;
+ int reportInterval;
}
class LRUMap<K, V> extends LinkedHashMap<K, V> {