You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/11/09 11:33:47 UTC

[rocketmq] branch develop updated: [ISSUE #4487]Optimize the output of pressure results (#5282)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f4ff22564 [ISSUE #4487]Optimize the output of pressure results (#5282)
f4ff22564 is described below

commit f4ff225649cd3a106a0e3da408355a3f0676d194
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Wed Nov 9 19:33:41 2022 +0800

    [ISSUE #4487]Optimize the output of pressure results (#5282)
    
    Co-authored-by: zhangjidi <zh...@cmss.chinamobile.com>
---
 .../rocketmq/example/benchmark/BatchProducer.java  |  9 ++---
 .../rocketmq/example/benchmark/Consumer.java       |  8 ++---
 .../rocketmq/example/benchmark/Producer.java       |  8 ++---
 .../example/benchmark/TransactionProducer.java     | 39 +++++++++++-----------
 4 files changed, 33 insertions(+), 31 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index 098dc11bb..23e922766 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -72,8 +73,8 @@ public class BatchProducer {
         final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
         final boolean aclEnable = getOptionValue(commandLine, 'a', false);
 
-        System.out.printf("topic: %s threadCount: %d messageSize: %d batchSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n",
-                topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);
+        System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, aclEnable: %s%n",
+            topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);
 
         StringBuilder sb = new StringBuilder(messageSize);
         for (int i = 0; i < messageSize; i++) {
@@ -386,8 +387,8 @@ class StatsBenchmarkBatchProducer {
                     final double averageRT = (end[5] - begin[5]) / (double) (end[1] - begin[1]);
                     final double averageMsgRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
 
-                    System.out.printf("Current Time: %s Send TPS: %d Send MPS: %d Max RT(ms): %d Average RT(ms): %7.3f Average Message RT(ms): %7.3f Send Failed: %d Send Message Failed: %d%n",
-                            System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().longValue(), averageRT, averageMsgRT, end[2], end[4]);
+                    System.out.printf("Current Time: %s | Send TPS: %d | Send MPS: %d | Max RT(ms): %d | Average RT(ms): %7.3f | Average Message RT(ms): %7.3f | Send Failed: %d | Send Message Failed: %d%n",
+                        UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), sendTps, sendMps, getSendMessageMaxRT().longValue(), averageRT, averageMsgRT, end[2], end[4]);
                 }
             }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 23a272bfc..87388edc9 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -79,7 +80,7 @@ public class Consumer {
         final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
 
         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
-                new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
+            new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
 
         final LinkedList<Long[]> snapshotList = new LinkedList<>();
 
@@ -110,9 +111,8 @@ public class Consumer {
                     statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
                     statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);
 
-                    System.out.printf("Current Time: %s TPS: %d FAIL: %d AVG(B2C) RT(ms): %7.3f AVG(S2C) RT(ms): %7.3f MAX(B2C) RT(ms): %d MAX(S2C) RT(ms): %d%n",
-                            System.currentTimeMillis(), consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
-                    );
+                    System.out.printf("Current Time: %s | Consume TPS: %d | AVG(B2C) RT(ms): %7.3f | AVG(S2C) RT(ms): %7.3f | MAX(B2C) RT(ms): %d | MAX(S2C) RT(ms): %d | Consume Fail: %d%n",
+                        UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), consumeTps, averageB2CRT, averageS2CRT, b2cMax, s2cMax, failCount);
                 }
             }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 7112c89a1..24266a7b1 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -79,8 +79,8 @@ public class Producer {
         final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y'));
         final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
 
-        System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d " +
-                "traceEnable: %s aclEnable: %s messageQuantity: %d%ndelayEnable: %s delayLevel: %s%n" +
+        System.out.printf("topic: %s, threadCount: %d, messageSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, " +
+                "traceEnable: %s, aclEnable: %s, messageQuantity: %d, delayEnable: %s, delayLevel: %s, " +
                 "asyncEnable: %s%n",
             topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum,
             delayEnable, delayLevel, asyncEnable);
@@ -357,11 +357,11 @@ public class Producer {
         final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
 
         if (done) {
-            System.out.printf("[Complete] Send Total: %d Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
+            System.out.printf("[Complete] Send Total: %d | Send TPS: %d | Max RT(ms): %d | Average RT(ms): %7.3f | Send Failed: %d | Response Failed: %d%n",
                 statsBenchmark.getSendRequestSuccessCount().longValue() + statsBenchmark.getSendRequestFailedCount().longValue(),
                 sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
         } else {
-            System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
+            System.out.printf("Current Time: %s | Send TPS: %d | Max RT(ms): %d | Average RT(ms): %7.3f | Send Failed: %d | Response Failed: %d%n",
                 UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
         }
     }
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 993224e3d..ebe3e01fd 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.client.producer.TransactionListener;
 import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -84,7 +85,7 @@ public class TransactionProducer {
         final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer();
 
         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
-                new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
+            new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
 
         final LinkedList<Snapshot> snapshotList = new LinkedList<>();
 
@@ -105,7 +106,7 @@ public class TransactionProducer {
                     Snapshot end = snapshotList.getLast();
 
                     final long sendCount = (end.sendRequestSuccessCount - begin.sendRequestSuccessCount)
-                            + (end.sendRequestFailedCount - begin.sendRequestFailedCount);
+                        + (end.sendRequestFailedCount - begin.sendRequestFailedCount);
                     final long sendTps = (sendCount * 1000L) / (end.endTime - begin.endTime);
                     final double averageRT = (end.sendMessageTimeTotal - begin.sendMessageTimeTotal) / (double) (end.sendRequestSuccessCount - begin.sendRequestSuccessCount);
 
@@ -115,9 +116,9 @@ public class TransactionProducer {
                     final long dupCheck = end.duplicatedCheck - begin.duplicatedCheck;
 
                     System.out.printf(
-                        "Current Time: %s Send TPS:%5d Max RT(ms):%5d AVG RT(ms):%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n",
-                            System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
-                            unexpectedCheck, dupCheck);
+                        "Current Time: %s | Send TPS: %5d | Max RT(ms): %5d | AVG RT(ms): %3.1f | Send Failed: %d | Check: %d | UnexpectedCheck: %d | DuplicatedCheck: %d%n",
+                        UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
+                        unexpectedCheck, dupCheck);
                     statsBenchmark.getSendMessageMaxRT().set(0);
                 }
             }
@@ -140,11 +141,11 @@ public class TransactionProducer {
         }
         final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config);
         final TransactionMQProducer producer = new TransactionMQProducer(
-                null,
-                "benchmark_transaction_producer",
-                rpcHook,
-                config.msgTraceEnable,
-                null);
+            null,
+            "benchmark_transaction_producer",
+            rpcHook,
+            config.msgTraceEnable,
+            null);
         producer.setInstanceName(Long.toString(System.currentTimeMillis()));
         producer.setTransactionListener(transactionCheckListener);
         producer.setDefaultTopicQueueNums(1000);
@@ -163,7 +164,7 @@ public class TransactionProducer {
                         final long beginTimestamp = System.currentTimeMillis();
                         try {
                             SendResult sendResult =
-                                    producer.sendMessageInTransaction(buildMessage(config), null);
+                                producer.sendMessageInTransaction(buildMessage(config), null);
                             success = sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK;
                         } catch (Throwable e) {
                             success = false;
@@ -173,7 +174,7 @@ public class TransactionProducer {
                             long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
                             while (currentRT > prevMaxRT) {
                                 boolean updated = statsBenchmark.getSendMessageMaxRT()
-                                        .compareAndSet(prevMaxRT, currentRT);
+                                    .compareAndSet(prevMaxRT, currentRT);
                                 if (updated)
                                     break;
 
@@ -364,10 +365,10 @@ class TransactionListenerImpl implements TransactionListener {
         }
         if (msgMeta.sendResult != LocalTransactionState.UNKNOW) {
             System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult=%s\n",
-                    new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
-                    msg.getMsgId(), msg.getTransactionId(),
-                    msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES),
-                    msgMeta.sendResult.toString());
+                new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
+                msg.getMsgId(), msg.getTransactionId(),
+                msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES),
+                msgMeta.sendResult.toString());
             statBenchmark.getUnexpectedCheckCount().increment();
             return msgMeta.sendResult;
         }
@@ -376,9 +377,9 @@ class TransactionListenerImpl implements TransactionListener {
             LocalTransactionState s = msgMeta.checkResult.get(i);
             if (s != LocalTransactionState.UNKNOW) {
                 System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult,lastCheckResult=%s\n",
-                        new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
-                        msg.getMsgId(), msg.getTransactionId(),
-                        msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), s);
+                    new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
+                    msg.getMsgId(), msg.getTransactionId(),
+                    msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), s);
                 statBenchmark.getUnexpectedCheckCount().increment();
                 return s;
             }