You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ol...@apache.org on 2023/03/14 13:08:02 UTC
[rocketmq] branch develop updated: [ISSUE #6215]make benchmark cover compress msg situation (#6216)
This is an automated email from the ASF dual-hosted git repository.
oliverwqcwrw pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1c98a2316f [ISSUE #6215]make benchmark cover compress msg situation (#6216)
1c98a2316f is described below
commit 1c98a2316f95381a44b15845542d8b58b5994f18
Author: Humkum <11...@qq.com>
AuthorDate: Tue Mar 14 21:07:51 2023 +0800
[ISSUE #6215]make benchmark cover compress msg situation (#6216)
---
.../rocketmq/example/benchmark/BatchProducer.java | 38 ++++++++++++++++++++--
.../rocketmq/example/benchmark/Producer.java | 34 +++++++++++++++++--
2 files changed, 66 insertions(+), 6 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index bfec3c2f59..8771339b76 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -72,9 +73,11 @@ public class BatchProducer {
final int tagCount = getOptionValue(commandLine, 'l', 0);
final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
final boolean aclEnable = getOptionValue(commandLine, 'a', false);
+ final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
- System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, aclEnable: %s%n",
- topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);
+ System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, " +
+ "aclEnable: %s%n compressEnable: %s%n",
+ topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress);
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
@@ -93,6 +96,19 @@ public class BatchProducer {
}
final DefaultMQProducer producer = initInstance(namesrv, msgTraceEnable, rpcHook);
+
+ if (enableCompress) {
+ String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
+ int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
+ int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
+ producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
+ producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
+ producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
+ System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
+ } else {
+ producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+ }
+
producer.start();
final Logger logger = LoggerFactory.getLogger(BatchProducer.class);
@@ -220,6 +236,23 @@ public class BatchProducer {
opt = new Option("n", "namesrv", true, "name server, Default: 127.0.0.1:9876");
opt.setRequired(false);
options.addOption(opt);
+
+ opt = new Option("c", "compressEnable", true, "Enable compress msg over 4K, Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("ct", "compressType", true, "Message compressed type, Default: ZLIB");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("cl", "compressLevel", true, "Message compressed level, Default: 5");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("ch", "compressOverHowMuch", true, "Compress message when body over how much(unit Byte), Default: 4096");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -303,7 +336,6 @@ public class BatchProducer {
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setNamesrvAddr(namesrv);
- producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
return producer;
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 1114c2c343..ab474fcf4f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -80,12 +81,13 @@ public class Producer {
final int delayLevel = commandLine.hasOption('e') ? Integer.parseInt(commandLine.getOptionValue('e')) : 1;
final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y'));
final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
+ final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
System.out.printf("topic: %s, threadCount: %d, messageSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, " +
"traceEnable: %s, aclEnable: %s, messageQuantity: %d, delayEnable: %s, delayLevel: %s, " +
- "asyncEnable: %s%n",
+ "asyncEnable: %s%n compressEnable: %s%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum,
- delayEnable, delayLevel, asyncEnable);
+ delayEnable, delayLevel, asyncEnable, enableCompress);
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
@@ -153,7 +155,17 @@ public class Producer {
producer.setNamesrvAddr(ns);
}
- producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+ if (enableCompress) {
+ String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
+ int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
+ int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
+ producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
+ producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
+ producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
+ System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
+ } else {
+ producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+ }
producer.start();
@@ -342,6 +354,22 @@ public class Producer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("c", "compressEnable", true, "Enable compress msg over 4K, Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("ct", "compressType", true, "Message compressed type, Default: ZLIB");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("cl", "compressLevel", true, "Message compressed level, Default: 5");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("ch", "compressOverHowMuch", true, "Compress message when body over how much(unit Byte), Default: 4096");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}