You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/04/21 03:01:03 UTC
[rocketmq] branch develop updated: [ISSUE #4187]Support async publish in producer benchmark (#4188)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 86738365e [ISSUE #4187]Support async publish in producer benchmark (#4188)
86738365e is described below
commit 86738365eccdb49d82a69cdd3ae27286b461e955
Author: cnScarb <jj...@163.com>
AuthorDate: Thu Apr 21 11:00:56 2022 +0800
[ISSUE #4187]Support async publish in producer benchmark (#4188)
* Support async publish in producer benchmark
* Support async publish in producer benchmark
---
.../rocketmq/example/benchmark/Producer.java | 69 +++++++++++++++++-----
1 file changed, 53 insertions(+), 16 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index c32e00e97..12410c015 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.example.benchmark;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -28,6 +29,9 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
@@ -48,6 +52,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class Producer {
private static byte[] msgBody;
+ private static final int MAX_LENGTH_ASYNC_QUEUE = 10000;
+ private static final int SLEEP_FOR_A_WHILE = 100;
public static void main(String[] args) throws MQClientException {
@@ -58,7 +64,6 @@ public class Producer {
}
final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
- final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k'));
final int propertySize = commandLine.hasOption('p') ? Integer.parseInt(commandLine.getOptionValue('p')) : 0;
@@ -68,9 +73,14 @@ public class Producer {
final long messageNum = commandLine.hasOption('q') ? Long.parseLong(commandLine.getOptionValue('q')) : 0;
final boolean delayEnable = commandLine.hasOption('d') && Boolean.parseBoolean(commandLine.getOptionValue('d'));
final int delayLevel = commandLine.hasOption('e') ? Integer.parseInt(commandLine.getOptionValue('e')) : 1;
+ final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y'));
+ final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
- System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n delayEnable: %s%n delayLevel: %s%n",
- topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum, delayEnable, delayLevel);
+ System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d " +
+ "traceEnable: %s aclEnable: %s messageQuantity: %d%ndelayEnable: %s delayLevel: %s%n" +
+ "asyncEnable: %s%n",
+ topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum,
+ delayEnable, delayLevel, asyncEnable);
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
@@ -186,18 +196,26 @@ public class Producer {
startValue += 2;
}
}
- producer.send(msg);
- statsBenchmark.getSendRequestSuccessCount().increment();
- statsBenchmark.getReceiveResponseSuccessCount().increment();
- final long currentRT = System.currentTimeMillis() - beginTimestamp;
- statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
- long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
- while (currentRT > prevMaxRT) {
- boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
- if (updated)
- break;
-
- prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
+ if (asyncEnable) {
+ ThreadPoolExecutor e = (ThreadPoolExecutor) producer.getDefaultMQProducerImpl().getAsyncSenderExecutor();
+ // Flow control
+ while (e.getQueue().size() > MAX_LENGTH_ASYNC_QUEUE) {
+ Thread.sleep(SLEEP_FOR_A_WHILE);
+ }
+ producer.send(msg, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ updateStatsSuccess(statsBenchmark, beginTimestamp);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ statsBenchmark.getSendRequestFailedCount().increment();
+ }
+ });
+ } else {
+ producer.send(msg);
+ updateStatsSuccess(statsBenchmark, beginTimestamp);
}
} catch (RemotingException e) {
statsBenchmark.getSendRequestFailedCount().increment();
@@ -253,6 +271,21 @@ public class Producer {
}
}
+ private static void updateStatsSuccess(StatsBenchmarkProducer statsBenchmark, long beginTimestamp) {
+ statsBenchmark.getSendRequestSuccessCount().increment();
+ statsBenchmark.getReceiveResponseSuccessCount().increment();
+ final long currentRT = System.currentTimeMillis() - beginTimestamp;
+ statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
+ long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
+ while (currentRT > prevMaxRT) {
+ boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
+ if (updated)
+ break;
+
+ prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
+ }
+ }
+
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("w", "threadCount", true, "Thread count, Default: 64");
opt.setRequired(false);
@@ -302,6 +335,10 @@ public class Producer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("y", "asyncEnable", true, "Enable async produce, Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -322,7 +359,7 @@ public class Producer {
sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
} else {
System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
- System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
+ UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
}
}
}