You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/08/26 07:32:08 UTC

[rocketmq] branch develop updated: [ISSUE #3284]Optimizing benchmark code (#3285)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7c7e9ac  [ISSUE #3284]Optimizing benchmark code (#3285)
7c7e9ac is described below

commit 7c7e9aca653703034a29a25211e1b6e806ebf733
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Thu Aug 26 15:31:57 2021 +0800

    [ISSUE #3284]Optimizing benchmark code (#3285)
    
    * [ISSUE #3284]Optimizing benchmark code
    
    * Reduce calls to the longValue method in the loop
    
    Co-authored-by: zhangjidi <zh...@cmss.chinamobile.com>
---
 .../rocketmq/example/benchmark/Consumer.java       | 33 +++++------
 .../rocketmq/example/benchmark/Producer.java       | 64 +++++++++++-----------
 2 files changed, 49 insertions(+), 48 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 154e6ed..7d26509 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.example.benchmark;
 
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -155,20 +156,20 @@ public class Consumer {
                 MessageExt msg = msgs.get(0);
                 long now = System.currentTimeMillis();
 
-                statsBenchmarkConsumer.getReceiveMessageTotalCount().incrementAndGet();
+                statsBenchmarkConsumer.getReceiveMessageTotalCount().increment();
 
                 long born2ConsumerRT = now - msg.getBornTimestamp();
-                statsBenchmarkConsumer.getBorn2ConsumerTotalRT().addAndGet(born2ConsumerRT);
+                statsBenchmarkConsumer.getBorn2ConsumerTotalRT().add(born2ConsumerRT);
 
                 long store2ConsumerRT = now - msg.getStoreTimestamp();
-                statsBenchmarkConsumer.getStore2ConsumerTotalRT().addAndGet(store2ConsumerRT);
+                statsBenchmarkConsumer.getStore2ConsumerTotalRT().add(store2ConsumerRT);
 
                 compareAndSetMax(statsBenchmarkConsumer.getBorn2ConsumerMaxRT(), born2ConsumerRT);
 
                 compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT);
 
                 if (ThreadLocalRandom.current().nextDouble() < failRate) {
-                    statsBenchmarkConsumer.getFailCount().incrementAndGet();
+                    statsBenchmarkConsumer.getFailCount().increment();
                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                 } else {
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
@@ -233,39 +234,39 @@ public class Consumer {
 }
 
 class StatsBenchmarkConsumer {
-    private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L);
+    private final LongAdder receiveMessageTotalCount = new LongAdder();
 
-    private final AtomicLong born2ConsumerTotalRT = new AtomicLong(0L);
+    private final LongAdder born2ConsumerTotalRT = new LongAdder();
 
-    private final AtomicLong store2ConsumerTotalRT = new AtomicLong(0L);
+    private final LongAdder store2ConsumerTotalRT = new LongAdder();
 
     private final AtomicLong born2ConsumerMaxRT = new AtomicLong(0L);
 
     private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L);
 
-    private final AtomicLong failCount = new AtomicLong(0L);
+    private final LongAdder failCount = new LongAdder();
 
     public Long[] createSnapshot() {
         Long[] snap = new Long[] {
             System.currentTimeMillis(),
-            this.receiveMessageTotalCount.get(),
-            this.born2ConsumerTotalRT.get(),
-            this.store2ConsumerTotalRT.get(),
-            this.failCount.get()
+            this.receiveMessageTotalCount.longValue(),
+            this.born2ConsumerTotalRT.longValue(),
+            this.store2ConsumerTotalRT.longValue(),
+            this.failCount.longValue()
         };
 
         return snap;
     }
 
-    public AtomicLong getReceiveMessageTotalCount() {
+    public LongAdder getReceiveMessageTotalCount() {
         return receiveMessageTotalCount;
     }
 
-    public AtomicLong getBorn2ConsumerTotalRT() {
+    public LongAdder getBorn2ConsumerTotalRT() {
         return born2ConsumerTotalRT;
     }
 
-    public AtomicLong getStore2ConsumerTotalRT() {
+    public LongAdder getStore2ConsumerTotalRT() {
         return store2ConsumerTotalRT;
     }
 
@@ -277,7 +278,7 @@ class StatsBenchmarkConsumer {
         return store2ConsumerMaxRT;
     }
 
-    public AtomicLong getFailCount() {
+    public LongAdder getFailCount() {
         return failCount;
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index b198a0f..cc29994 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.example.benchmark;
 
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -156,8 +157,7 @@ public class Producer {
                                 msg.setDelayTimeLevel(delayLevel);
                             }
                             if (tagCount > 0) {
-                                long sendSucCount = statsBenchmark.getReceiveResponseSuccessCount().get();
-                                msg.setTags(String.format("tag%d", sendSucCount % tagCount));
+                                msg.setTags(String.format("tag%d", System.currentTimeMillis() % tagCount));
                             }
                             if (propertySize > 0) {
                                 if (msg.getProperties() != null) {
@@ -180,20 +180,20 @@ public class Producer {
                                 }
                             }
                             producer.send(msg);
-                            statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
-                            statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
+                            statsBenchmark.getSendRequestSuccessCount().increment();
+                            statsBenchmark.getReceiveResponseSuccessCount().increment();
                             final long currentRT = System.currentTimeMillis() - beginTimestamp;
-                            statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
-                            long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+                            statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
+                            long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
                             while (currentRT > prevMaxRT) {
                                 boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
                                 if (updated)
                                     break;
 
-                                prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+                                prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
                             }
                         } catch (RemotingException e) {
-                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+                            statsBenchmark.getSendRequestFailedCount().increment();
                             log.error("[BENCHMARK_PRODUCER] Send Exception", e);
 
                             try {
@@ -201,16 +201,16 @@ public class Producer {
                             } catch (InterruptedException ignored) {
                             }
                         } catch (InterruptedException e) {
-                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+                            statsBenchmark.getSendRequestFailedCount().increment();
                             try {
                                 Thread.sleep(3000);
                             } catch (InterruptedException e1) {
                             }
                         } catch (MQClientException e) {
-                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+                            statsBenchmark.getSendRequestFailedCount().increment();
                             log.error("[BENCHMARK_PRODUCER] Send Exception", e);
                         } catch (MQBrokerException e) {
-                            statsBenchmark.getReceiveResponseFailedCount().incrementAndGet();
+                            statsBenchmark.getReceiveResponseFailedCount().increment();
                             log.error("[BENCHMARK_PRODUCER] Send Exception", e);
                             try {
                                 Thread.sleep(3000);
@@ -237,8 +237,8 @@ public class Producer {
                 doPrintStats(snapshotList, statsBenchmark, true);
             } else {
                 System.out.printf("[Complete] Send Total: %d Send Failed: %d Response Failed: %d%n",
-                    statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(),
-                    statsBenchmark.getSendRequestFailedCount().get(), statsBenchmark.getReceiveResponseFailedCount().get());
+                    statsBenchmark.getSendRequestSuccessCount().longValue() + statsBenchmark.getSendRequestFailedCount().longValue(),
+                    statsBenchmark.getSendRequestFailedCount().longValue(), statsBenchmark.getReceiveResponseFailedCount().longValue());
             }
             producer.shutdown();
         } catch (InterruptedException e) {
@@ -294,7 +294,7 @@ public class Producer {
         Message msg = new Message();
         msg.setTopic(topic);
 
-        StringBuilder sb = new StringBuilder();
+        StringBuilder sb = new StringBuilder(messageSize);
         for (int i = 0; i < messageSize; i += 10) {
             sb.append("hello baby");
         }
@@ -313,58 +313,58 @@ public class Producer {
 
         if (done) {
             System.out.printf("[Complete] Send Total: %d Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
-                statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(),
-                sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
+                statsBenchmark.getSendRequestSuccessCount().longValue() + statsBenchmark.getSendRequestFailedCount().longValue(),
+                sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
         } else {
             System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
-                System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
+                System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
         }
     }
 }
 
 class StatsBenchmarkProducer {
-    private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
+    private final LongAdder sendRequestSuccessCount = new LongAdder();
 
-    private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
+    private final LongAdder sendRequestFailedCount = new LongAdder();
 
-    private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L);
+    private final LongAdder receiveResponseSuccessCount = new LongAdder();
 
-    private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L);
+    private final LongAdder receiveResponseFailedCount = new LongAdder();
 
-    private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
+    private final LongAdder sendMessageSuccessTimeTotal = new LongAdder();
 
     private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
 
     public Long[] createSnapshot() {
         Long[] snap = new Long[] {
             System.currentTimeMillis(),
-            this.sendRequestSuccessCount.get(),
-            this.sendRequestFailedCount.get(),
-            this.receiveResponseSuccessCount.get(),
-            this.receiveResponseFailedCount.get(),
-            this.sendMessageSuccessTimeTotal.get(),
+            this.sendRequestSuccessCount.longValue(),
+            this.sendRequestFailedCount.longValue(),
+            this.receiveResponseSuccessCount.longValue(),
+            this.receiveResponseFailedCount.longValue(),
+            this.sendMessageSuccessTimeTotal.longValue(),
         };
 
         return snap;
     }
 
-    public AtomicLong getSendRequestSuccessCount() {
+    public LongAdder getSendRequestSuccessCount() {
         return sendRequestSuccessCount;
     }
 
-    public AtomicLong getSendRequestFailedCount() {
+    public LongAdder getSendRequestFailedCount() {
         return sendRequestFailedCount;
     }
 
-    public AtomicLong getReceiveResponseSuccessCount() {
+    public LongAdder getReceiveResponseSuccessCount() {
         return receiveResponseSuccessCount;
     }
 
-    public AtomicLong getReceiveResponseFailedCount() {
+    public LongAdder getReceiveResponseFailedCount() {
         return receiveResponseFailedCount;
     }
 
-    public AtomicLong getSendMessageSuccessTimeTotal() {
+    public LongAdder getSendMessageSuccessTimeTotal() {
         return sendMessageSuccessTimeTotal;
     }