You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/04/21 03:01:03 UTC

[rocketmq] branch develop updated: [ISSUE #4187]Support async publish in producer benchmark (#4188)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 86738365e [ISSUE #4187]Support async publish in producer benchmark (#4188)
86738365e is described below

commit 86738365eccdb49d82a69cdd3ae27286b461e955
Author: cnScarb <jj...@163.com>
AuthorDate: Thu Apr 21 11:00:56 2022 +0800

    [ISSUE #4187]Support async publish in producer benchmark (#4188)
    
    * Support async publish in producer benchmark
    
    * Support async publish in producer benchmark
---
 .../rocketmq/example/benchmark/Producer.java       | 69 +++++++++++++++++-----
 1 file changed, 53 insertions(+), 16 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index c32e00e97..12410c015 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.example.benchmark;
 
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
@@ -28,6 +29,9 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -48,6 +52,8 @@ import java.util.concurrent.atomic.AtomicLong;
 public class Producer {
 
     private static byte[] msgBody;
+    private static final int MAX_LENGTH_ASYNC_QUEUE = 10000;
+    private static final int SLEEP_FOR_A_WHILE = 100;
 
     public static void main(String[] args) throws MQClientException {
 
@@ -58,7 +64,6 @@ public class Producer {
         }
 
         final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
-        final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
         final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
         final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k'));
         final int propertySize = commandLine.hasOption('p') ? Integer.parseInt(commandLine.getOptionValue('p')) : 0;
@@ -68,9 +73,14 @@ public class Producer {
         final long messageNum = commandLine.hasOption('q') ? Long.parseLong(commandLine.getOptionValue('q')) : 0;
         final boolean delayEnable = commandLine.hasOption('d') && Boolean.parseBoolean(commandLine.getOptionValue('d'));
         final int delayLevel = commandLine.hasOption('e') ? Integer.parseInt(commandLine.getOptionValue('e')) : 1;
+        final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y'));
+        final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
 
-        System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n delayEnable: %s%n delayLevel: %s%n",
-            topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum, delayEnable, delayLevel);
+        System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d " +
+                "traceEnable: %s aclEnable: %s messageQuantity: %d%ndelayEnable: %s delayLevel: %s%n" +
+                "asyncEnable: %s%n",
+            topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum,
+            delayEnable, delayLevel, asyncEnable);
 
         StringBuilder sb = new StringBuilder(messageSize);
         for (int i = 0; i < messageSize; i++) {
@@ -186,18 +196,26 @@ public class Producer {
                                     startValue += 2;
                                 }
                             }
-                            producer.send(msg);
-                            statsBenchmark.getSendRequestSuccessCount().increment();
-                            statsBenchmark.getReceiveResponseSuccessCount().increment();
-                            final long currentRT = System.currentTimeMillis() - beginTimestamp;
-                            statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
-                            long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
-                            while (currentRT > prevMaxRT) {
-                                boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
-                                if (updated)
-                                    break;
-
-                                prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
+                            if (asyncEnable) {
+                                ThreadPoolExecutor e = (ThreadPoolExecutor) producer.getDefaultMQProducerImpl().getAsyncSenderExecutor();
+                                // Flow control
+                                while (e.getQueue().size() > MAX_LENGTH_ASYNC_QUEUE) {
+                                    Thread.sleep(SLEEP_FOR_A_WHILE);
+                                }
+                                producer.send(msg, new SendCallback() {
+                                    @Override
+                                    public void onSuccess(SendResult sendResult) {
+                                        updateStatsSuccess(statsBenchmark, beginTimestamp);
+                                    }
+
+                                    @Override
+                                    public void onException(Throwable e) {
+                                        statsBenchmark.getSendRequestFailedCount().increment();
+                                    }
+                                });
+                            } else {
+                                producer.send(msg);
+                                updateStatsSuccess(statsBenchmark, beginTimestamp);
                             }
                         } catch (RemotingException e) {
                             statsBenchmark.getSendRequestFailedCount().increment();
@@ -253,6 +271,21 @@ public class Producer {
         }
     }
 
+    private static void updateStatsSuccess(StatsBenchmarkProducer statsBenchmark, long beginTimestamp) {
+        statsBenchmark.getSendRequestSuccessCount().increment();
+        statsBenchmark.getReceiveResponseSuccessCount().increment();
+        final long currentRT = System.currentTimeMillis() - beginTimestamp;
+        statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
+        long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
+        while (currentRT > prevMaxRT) {
+            boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
+            if (updated)
+                break;
+
+            prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
+        }
+    }
+
     public static Options buildCommandlineOptions(final Options options) {
         Option opt = new Option("w", "threadCount", true, "Thread count, Default: 64");
         opt.setRequired(false);
@@ -302,6 +335,10 @@ public class Producer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("y", "asyncEnable", true, "Enable async produce, Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -322,7 +359,7 @@ public class Producer {
                 sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
         } else {
             System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
-                System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
+                UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
         }
     }
 }