You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ol...@apache.org on 2023/03/14 13:08:02 UTC

[rocketmq] branch develop updated: [ISSUE #6215]make benchmark cover compress msg situation (#6216)

This is an automated email from the ASF dual-hosted git repository.

oliverwqcwrw pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1c98a2316f [ISSUE #6215]make benchmark cover compress msg situation (#6216)
1c98a2316f is described below

commit 1c98a2316f95381a44b15845542d8b58b5994f18
Author: Humkum <11...@qq.com>
AuthorDate: Tue Mar 14 21:07:51 2023 +0800

    [ISSUE #6215]make benchmark cover compress msg situation (#6216)
---
 .../rocketmq/example/benchmark/BatchProducer.java  | 38 ++++++++++++++++++++--
 .../rocketmq/example/benchmark/Producer.java       | 34 +++++++++++++++++--
 2 files changed, 66 insertions(+), 6 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index bfec3c2f59..8771339b76 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.compression.CompressionType;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -72,9 +73,11 @@ public class BatchProducer {
         final int tagCount = getOptionValue(commandLine, 'l', 0);
         final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
         final boolean aclEnable = getOptionValue(commandLine, 'a', false);
+        final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
 
-        System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, aclEnable: %s%n",
-            topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);
+        System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, " +
+                "aclEnable: %s%n compressEnable: %s%n",
+            topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress);
 
         StringBuilder sb = new StringBuilder(messageSize);
         for (int i = 0; i < messageSize; i++) {
@@ -93,6 +96,19 @@ public class BatchProducer {
         }
 
         final DefaultMQProducer producer = initInstance(namesrv, msgTraceEnable, rpcHook);
+
+        if (enableCompress) {
+            String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
+            int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
+            int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
+            producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
+            producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
+            producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
+            System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
+        } else {
+            producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+        }
+
         producer.start();
 
         final Logger logger = LoggerFactory.getLogger(BatchProducer.class);
@@ -220,6 +236,23 @@ public class BatchProducer {
         opt = new Option("n", "namesrv", true, "name server, Default: 127.0.0.1:9876");
         opt.setRequired(false);
         options.addOption(opt);
+
+        opt = new Option("c", "compressEnable", true, "Enable compress msg over 4K, Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("ct", "compressType", true, "Message compressed type, Default: ZLIB");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("cl", "compressLevel", true, "Message compressed level, Default: 5");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("ch", "compressOverHowMuch", true, "Compress message when body over how much(unit Byte), Default: 4096");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -303,7 +336,6 @@ public class BatchProducer {
         producer.setInstanceName(Long.toString(System.currentTimeMillis()));
 
         producer.setNamesrvAddr(namesrv);
-        producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
         return producer;
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 1114c2c343..ab474fcf4f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.compression.CompressionType;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -80,12 +81,13 @@ public class Producer {
         final int delayLevel = commandLine.hasOption('e') ? Integer.parseInt(commandLine.getOptionValue('e')) : 1;
         final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y'));
         final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
+        final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
 
         System.out.printf("topic: %s, threadCount: %d, messageSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, " +
                 "traceEnable: %s, aclEnable: %s, messageQuantity: %d, delayEnable: %s, delayLevel: %s, " +
-                "asyncEnable: %s%n",
+                "asyncEnable: %s%n compressEnable: %s%n",
             topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum,
-            delayEnable, delayLevel, asyncEnable);
+            delayEnable, delayLevel, asyncEnable, enableCompress);
 
         StringBuilder sb = new StringBuilder(messageSize);
         for (int i = 0; i < messageSize; i++) {
@@ -153,7 +155,17 @@ public class Producer {
             producer.setNamesrvAddr(ns);
         }
 
-        producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+        if (enableCompress) {
+            String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
+            int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
+            int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
+            producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
+            producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
+            producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
+            System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
+        } else {
+            producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+        }
 
         producer.start();
 
@@ -342,6 +354,22 @@ public class Producer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("c", "compressEnable", true, "Enable compress msg over 4K, Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("ct", "compressType", true, "Message compressed type, Default: ZLIB");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("cl", "compressLevel", true, "Message compressed level, Default: 5");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("ch", "compressOverHowMuch", true, "Compress message when body over how much(unit Byte), Default: 4096");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }