You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/27 02:38:41 UTC

[rocketmq] branch develop updated: [ISSUE #6803] Benchmark support reportInterval option (#6804)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e545e3b359 [ISSUE #6803] Benchmark support reportInterval option (#6804)
e545e3b359 is described below

commit e545e3b359d4808537ff2f1f5bb45c04879aec86
Author: DL1231 <53...@users.noreply.github.com>
AuthorDate: Sat May 27 10:38:17 2023 +0800

    [ISSUE #6803] Benchmark support reportInterval option (#6804)
---
 .../rocketmq/example/benchmark/BatchProducer.java     | 19 +++++++++++++++----
 .../apache/rocketmq/example/benchmark/Consumer.java   | 11 ++++++++---
 .../apache/rocketmq/example/benchmark/Producer.java   | 11 ++++++++---
 .../example/benchmark/TransactionProducer.java        | 11 ++++++++---
 4 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index 8771339b76..c4a6162a5f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -74,10 +74,11 @@ public class BatchProducer {
         final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
         final boolean aclEnable = getOptionValue(commandLine, 'a', false);
         final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
+        final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
 
         System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, " +
-                "aclEnable: %s%n compressEnable: %s%n",
-            topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress);
+                "aclEnable: %s%n compressEnable: %s, reportInterval: %d%n",
+            topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress, reportInterval);
 
         StringBuilder sb = new StringBuilder(messageSize);
         for (int i = 0; i < messageSize; i++) {
@@ -85,7 +86,7 @@ public class BatchProducer {
         }
         msgBody = sb.toString().getBytes(StandardCharsets.UTF_8);
 
-        final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer();
+        final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer(reportInterval);
         statsBenchmark.start();
 
         RPCHook rpcHook = null;
@@ -253,6 +254,10 @@ public class BatchProducer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -359,6 +364,12 @@ class StatsBenchmarkBatchProducer {
 
     private final LinkedList<Long[]> snapshotList = new LinkedList<>();
 
+    private final int reportInterval;
+
+    public StatsBenchmarkBatchProducer(int reportInterval) {
+        this.reportInterval = reportInterval;
+    }
+
     public Long[] createSnapshot() {
         Long[] snap = new Long[] {
             System.currentTimeMillis(),
@@ -432,7 +443,7 @@ class StatsBenchmarkBatchProducer {
                     e.printStackTrace();
                 }
             }
-        }, 10000, 10000, TimeUnit.MILLISECONDS);
+        }, reportInterval, reportInterval, TimeUnit.MILLISECONDS);
     }
 
     public void shutdown() {
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 87388edc9a..57270fcd00 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -68,14 +68,15 @@ public class Consumer {
         final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
         final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
         final boolean clientRebalanceEnable = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c')) : true;
+        final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
 
         String group = groupPrefix;
         if (Boolean.parseBoolean(isSuffixEnable)) {
             group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
         }
 
-        System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
-            topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
+        System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s, reportInterval: %d%n",
+            topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable, reportInterval);
 
         final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
 
@@ -124,7 +125,7 @@ public class Consumer {
                     e.printStackTrace();
                 }
             }
-        }, 10000, 10000, TimeUnit.MILLISECONDS);
+        }, reportInterval, reportInterval, TimeUnit.MILLISECONDS);
 
         RPCHook rpcHook = null;
         if (aclEnable) {
@@ -235,6 +236,10 @@ public class Consumer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index ab474fcf4f..480d16b758 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -82,12 +82,13 @@ public class Producer {
         final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y'));
         final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
         final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
+        final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
 
         System.out.printf("topic: %s, threadCount: %d, messageSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, " +
                 "traceEnable: %s, aclEnable: %s, messageQuantity: %d, delayEnable: %s, delayLevel: %s, " +
-                "asyncEnable: %s%n compressEnable: %s%n",
+                "asyncEnable: %s%n compressEnable: %s, reportInterval: %d%n",
             topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum,
-            delayEnable, delayLevel, asyncEnable, enableCompress);
+            delayEnable, delayLevel, asyncEnable, enableCompress, reportInterval);
 
         StringBuilder sb = new StringBuilder(messageSize);
         for (int i = 0; i < messageSize; i++) {
@@ -139,7 +140,7 @@ public class Producer {
                     e.printStackTrace();
                 }
             }
-        }, 10000, 10000, TimeUnit.MILLISECONDS);
+        }, reportInterval, reportInterval, TimeUnit.MILLISECONDS);
 
         RPCHook rpcHook = null;
         if (aclEnable) {
@@ -370,6 +371,10 @@ public class Producer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index ebe3e01fdc..34cdeb49db 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -79,6 +79,7 @@ public class TransactionProducer {
         config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0;
         config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
         config.msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
+        config.reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
 
         final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount);
 
@@ -105,8 +106,7 @@ public class TransactionProducer {
                     Snapshot begin = snapshotList.getFirst();
                     Snapshot end = snapshotList.getLast();
 
-                    final long sendCount = (end.sendRequestSuccessCount - begin.sendRequestSuccessCount)
-                        + (end.sendRequestFailedCount - begin.sendRequestFailedCount);
+                    final long sendCount = end.sendRequestSuccessCount - begin.sendRequestSuccessCount;
                     final long sendTps = (sendCount * 1000L) / (end.endTime - begin.endTime);
                     final double averageRT = (end.sendMessageTimeTotal - begin.sendMessageTimeTotal) / (double) (end.sendRequestSuccessCount - begin.sendRequestSuccessCount);
 
@@ -131,7 +131,7 @@ public class TransactionProducer {
                     e.printStackTrace();
                 }
             }
-        }, 10000, 10000, TimeUnit.MILLISECONDS);
+        }, config.reportInterval, config.reportInterval, TimeUnit.MILLISECONDS);
 
         RPCHook rpcHook = null;
         if (config.aclEnable) {
@@ -291,6 +291,10 @@ public class TransactionProducer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("ri", "reportInterval", true, "The number of ms between reports, Default: 10000");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 }
@@ -475,6 +479,7 @@ class TxSendConfig {
     int sendInterval;
     boolean aclEnable;
     boolean msgTraceEnable;
+    int reportInterval;
 }
 
 class LRUMap<K, V> extends LinkedHashMap<K, V> {