You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/09/04 22:23:47 UTC

[rocketmq] branch develop updated: [ISSUE #3284]Optimizing benchmark code (#3317)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 857d28d  [ISSUE #3284]Optimizing benchmark code (#3317)
857d28d is described below

commit 857d28df5e6dfe697fd01d86bcc1a48c0bdef234
Author: 【keepal】 <97...@qq.com>
AuthorDate: Sun Sep 5 06:23:32 2021 +0800

    [ISSUE #3284]Optimizing benchmark code (#3317)
---
 .../rocketmq/example/benchmark/BatchProducer.java  | 68 +++++++++++-----------
 .../example/benchmark/TransactionProducer.java     | 57 +++++++++---------
 2 files changed, 65 insertions(+), 60 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index 843b84b..f3e8b60 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -26,6 +26,8 @@ import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -100,22 +102,22 @@ public class BatchProducer {
 
                         try {
                             long beginTimestamp = System.currentTimeMillis();
-                            long sendSucCount = statsBenchmark.getSendMessageSuccessCount().get();
+                            long sendSucCount = statsBenchmark.getSendMessageSuccessCount().longValue();
 
                             setKeys(keyEnable, msgs, String.valueOf(beginTimestamp / 1000));
                             setTags(tagCount, msgs, sendSucCount);
                             setProperties(propertySize, msgs);
                             SendResult sendResult = producer.send(msgs);
                             if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
-                                statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
-                                statsBenchmark.getSendMessageSuccessCount().addAndGet(msgs.size());
+                                statsBenchmark.getSendRequestSuccessCount().increment();
+                                statsBenchmark.getSendMessageSuccessCount().add(msgs.size());
                             } else {
-                                statsBenchmark.getSendRequestFailedCount().incrementAndGet();
-                                statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+                                statsBenchmark.getSendRequestFailedCount().increment();
+                                statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                             }
                             long currentRT = System.currentTimeMillis() - beginTimestamp;
-                            statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
-                            long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+                            statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
+                            long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
                             while (currentRT > prevMaxRT) {
                                 boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
                                 if (updated) {
@@ -125,8 +127,8 @@ public class BatchProducer {
                                 prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
                             }
                         } catch (RemotingException e) {
-                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
-                            statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+                            statsBenchmark.getSendRequestFailedCount().increment();
+                            statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                             log.error("[BENCHMARK_PRODUCER] Send Exception", e);
 
                             try {
@@ -134,22 +136,22 @@ public class BatchProducer {
                             } catch (InterruptedException ignored) {
                             }
                         } catch (InterruptedException e) {
-                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
-                            statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+                            statsBenchmark.getSendRequestFailedCount().increment();
+                            statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                             try {
                                 Thread.sleep(3000);
                             } catch (InterruptedException e1) {
                             }
-                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
-                            statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+                            statsBenchmark.getSendRequestFailedCount().increment();
+                            statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                             log.error("[BENCHMARK_PRODUCER] Send Exception", e);
                         } catch (MQClientException e) {
-                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
-                            statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+                            statsBenchmark.getSendRequestFailedCount().increment();
+                            statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                             log.error("[BENCHMARK_PRODUCER] Send Exception", e);
                         } catch (MQBrokerException e) {
-                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
-                            statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+                            statsBenchmark.getSendRequestFailedCount().increment();
+                            statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                             log.error("[BENCHMARK_PRODUCER] Send Exception", e);
                             try {
                                 Thread.sleep(3000);
@@ -313,17 +315,17 @@ public class BatchProducer {
 
 class StatsBenchmarkBatchProducer {
 
-    private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
+    private final LongAdder sendRequestSuccessCount = new LongAdder();
 
-    private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
+    private final LongAdder sendRequestFailedCount = new LongAdder();
 
-    private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
+    private final LongAdder sendMessageSuccessTimeTotal = new LongAdder();
 
     private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
 
-    private final AtomicLong sendMessageSuccessCount = new AtomicLong(0L);
+    private final LongAdder sendMessageSuccessCount = new LongAdder();
 
-    private final AtomicLong sendMessageFailedCount = new AtomicLong(0L);
+    private final LongAdder sendMessageFailedCount = new LongAdder();
 
     private final Timer timer = new Timer("BenchmarkTimerThread", true);
 
@@ -332,25 +334,25 @@ class StatsBenchmarkBatchProducer {
     public Long[] createSnapshot() {
         Long[] snap = new Long[] {
                 System.currentTimeMillis(),
-                this.sendRequestSuccessCount.get(),
-                this.sendRequestFailedCount.get(),
-                this.sendMessageSuccessCount.get(),
-                this.sendMessageFailedCount.get(),
-                this.sendMessageSuccessTimeTotal.get(),
+                this.sendRequestSuccessCount.longValue(),
+                this.sendRequestFailedCount.longValue(),
+                this.sendMessageSuccessCount.longValue(),
+                this.sendMessageFailedCount.longValue(),
+                this.sendMessageSuccessTimeTotal.longValue(),
         };
 
         return snap;
     }
 
-    public AtomicLong getSendRequestSuccessCount() {
+    public LongAdder getSendRequestSuccessCount() {
         return sendRequestSuccessCount;
     }
 
-    public AtomicLong getSendRequestFailedCount() {
+    public LongAdder getSendRequestFailedCount() {
         return sendRequestFailedCount;
     }
 
-    public AtomicLong getSendMessageSuccessTimeTotal() {
+    public LongAdder getSendMessageSuccessTimeTotal() {
         return sendMessageSuccessTimeTotal;
     }
 
@@ -358,11 +360,11 @@ class StatsBenchmarkBatchProducer {
         return sendMessageMaxRT;
     }
 
-    public AtomicLong getSendMessageSuccessCount() {
+    public LongAdder getSendMessageSuccessCount() {
         return sendMessageSuccessCount;
     }
 
-    public AtomicLong getSendMessageFailedCount() {
+    public LongAdder getSendMessageFailedCount() {
         return sendMessageFailedCount;
     }
 
@@ -390,7 +392,7 @@ class StatsBenchmarkBatchProducer {
                     final double averageMsgRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
 
                     System.out.printf("Current Time: %s Send TPS: %d Send MPS: %d Max RT(ms): %d Average RT(ms): %7.3f Average Message RT(ms): %7.3f Send Failed: %d Send Message Failed: %d%n",
-                            System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().get(), averageRT, averageMsgRT, end[2], end[4]);
+                            System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().longValue(), averageRT, averageMsgRT, end[2], end[4]);
                 }
             }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index c4f14a4..767a96b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -50,10 +50,11 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
 
 public class TransactionProducer {
     private static final long START_TIME = System.currentTimeMillis();
-    private static final AtomicLong MSG_COUNT = new AtomicLong(0);
+    private static final LongAdder MSG_COUNT = new LongAdder();
 
     //broker max check times should less than this value
     static final int MAX_CHECK_RESULT_IN_MSG = 20;
@@ -158,7 +159,7 @@ public class TransactionProducer {
                             success = false;
                         } finally {
                             final long currentRT = System.currentTimeMillis() - beginTimestamp;
-                            statsBenchmark.getSendMessageTimeTotal().addAndGet(currentRT);
+                            statsBenchmark.getSendMessageTimeTotal().add(currentRT);
                             long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
                             while (currentRT > prevMaxRT) {
                                 boolean updated = statsBenchmark.getSendMessageMaxRT()
@@ -169,9 +170,9 @@ public class TransactionProducer {
                                 prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
                             }
                             if (success) {
-                                statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
+                                statsBenchmark.getSendRequestSuccessCount().increment();
                             } else {
-                                statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+                                statsBenchmark.getSendRequestFailedCount().increment();
                             }
                             if (config.sendInterval > 0) {
                                 try {
@@ -194,7 +195,9 @@ public class TransactionProducer {
         ByteBuffer buf = ByteBuffer.wrap(bs);
         buf.putLong(config.batchId);
         long sendMachineId = START_TIME << 32;
-        long msgId = sendMachineId | MSG_COUNT.getAndIncrement();
+        long count = MSG_COUNT.longValue();
+        long msgId = sendMachineId | count;
+        MSG_COUNT.increment();
         buf.putLong(msgId);
 
         // save send tx result in message
@@ -316,7 +319,7 @@ class TransactionListenerImpl implements TransactionListener {
             // message not generated in this test
             return LocalTransactionState.ROLLBACK_MESSAGE;
         }
-        statBenchmark.getCheckCount().incrementAndGet();
+        statBenchmark.getCheckCount().increment();
 
         int times = 0;
         try {
@@ -339,7 +342,7 @@ class TransactionListenerImpl implements TransactionListener {
             dup = newCheckLog.equals(oldCheckLog);
         }
         if (dup) {
-            statBenchmark.getDuplicatedCheckCount().incrementAndGet();
+            statBenchmark.getDuplicatedCheckCount().increment();
         }
         if (msgMeta.sendResult != LocalTransactionState.UNKNOW) {
             System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult=%s\n",
@@ -347,7 +350,7 @@ class TransactionListenerImpl implements TransactionListener {
                     msg.getMsgId(), msg.getTransactionId(),
                     msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES),
                     msgMeta.sendResult.toString());
-            statBenchmark.getUnexpectedCheckCount().incrementAndGet();
+            statBenchmark.getUnexpectedCheckCount().increment();
             return msgMeta.sendResult;
         }
 
@@ -358,7 +361,7 @@ class TransactionListenerImpl implements TransactionListener {
                         new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
                         msg.getMsgId(), msg.getTransactionId(),
                         msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), s);
-                statBenchmark.getUnexpectedCheckCount().incrementAndGet();
+                statBenchmark.getUnexpectedCheckCount().increment();
                 return s;
             }
         }
@@ -385,42 +388,42 @@ class Snapshot {
 }
 
 class StatsBenchmarkTProducer {
-    private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
+    private final LongAdder sendRequestSuccessCount = new LongAdder();
 
-    private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
+    private final LongAdder sendRequestFailedCount = new LongAdder();
 
-    private final AtomicLong sendMessageTimeTotal = new AtomicLong(0L);
+    private final LongAdder sendMessageTimeTotal = new LongAdder();
 
     private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
 
-    private final AtomicLong checkCount = new AtomicLong(0L);
+    private final LongAdder checkCount = new LongAdder();
 
-    private final AtomicLong unexpectedCheckCount = new AtomicLong(0L);
+    private final LongAdder unexpectedCheckCount = new LongAdder();
 
-    private final AtomicLong duplicatedCheckCount = new AtomicLong(0);
+    private final LongAdder duplicatedCheckCount = new LongAdder();
 
     public Snapshot createSnapshot() {
         Snapshot s = new Snapshot();
         s.endTime = System.currentTimeMillis();
-        s.sendRequestSuccessCount = sendRequestSuccessCount.get();
-        s.sendRequestFailedCount = sendRequestFailedCount.get();
-        s.sendMessageTimeTotal = sendMessageTimeTotal.get();
+        s.sendRequestSuccessCount = sendRequestSuccessCount.longValue();
+        s.sendRequestFailedCount = sendRequestFailedCount.longValue();
+        s.sendMessageTimeTotal = sendMessageTimeTotal.longValue();
         s.sendMessageMaxRT = sendMessageMaxRT.get();
-        s.checkCount = checkCount.get();
-        s.unexpectedCheckCount = unexpectedCheckCount.get();
-        s.duplicatedCheck = duplicatedCheckCount.get();
+        s.checkCount = checkCount.longValue();
+        s.unexpectedCheckCount = unexpectedCheckCount.longValue();
+        s.duplicatedCheck = duplicatedCheckCount.longValue();
         return s;
     }
 
-    public AtomicLong getSendRequestSuccessCount() {
+    public LongAdder getSendRequestSuccessCount() {
         return sendRequestSuccessCount;
     }
 
-    public AtomicLong getSendRequestFailedCount() {
+    public LongAdder getSendRequestFailedCount() {
         return sendRequestFailedCount;
     }
 
-    public AtomicLong getSendMessageTimeTotal() {
+    public LongAdder getSendMessageTimeTotal() {
         return sendMessageTimeTotal;
     }
 
@@ -428,15 +431,15 @@ class StatsBenchmarkTProducer {
         return sendMessageMaxRT;
     }
 
-    public AtomicLong getCheckCount() {
+    public LongAdder getCheckCount() {
         return checkCount;
     }
 
-    public AtomicLong getUnexpectedCheckCount() {
+    public LongAdder getUnexpectedCheckCount() {
         return unexpectedCheckCount;
     }
 
-    public AtomicLong getDuplicatedCheckCount() {
+    public LongAdder getDuplicatedCheckCount() {
         return duplicatedCheckCount;
     }
 }