You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/08/26 07:32:08 UTC
[rocketmq] branch develop updated: [ISSUE #3284]Optimizing
benchmark code (#3285)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7c7e9ac [ISSUE #3284]Optimizing benchmark code (#3285)
7c7e9ac is described below
commit 7c7e9aca653703034a29a25211e1b6e806ebf733
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Thu Aug 26 15:31:57 2021 +0800
[ISSUE #3284]Optimizing benchmark code (#3285)
* [ISSUE #3284]Optimizing benchmark code
* Reduce calls to the longValue method in the loop
Co-authored-by: zhangjidi <zh...@cmss.chinamobile.com>
---
.../rocketmq/example/benchmark/Consumer.java | 33 +++++------
.../rocketmq/example/benchmark/Producer.java | 64 +++++++++++-----------
2 files changed, 49 insertions(+), 48 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 154e6ed..7d26509 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.example.benchmark;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -155,20 +156,20 @@ public class Consumer {
MessageExt msg = msgs.get(0);
long now = System.currentTimeMillis();
- statsBenchmarkConsumer.getReceiveMessageTotalCount().incrementAndGet();
+ statsBenchmarkConsumer.getReceiveMessageTotalCount().increment();
long born2ConsumerRT = now - msg.getBornTimestamp();
- statsBenchmarkConsumer.getBorn2ConsumerTotalRT().addAndGet(born2ConsumerRT);
+ statsBenchmarkConsumer.getBorn2ConsumerTotalRT().add(born2ConsumerRT);
long store2ConsumerRT = now - msg.getStoreTimestamp();
- statsBenchmarkConsumer.getStore2ConsumerTotalRT().addAndGet(store2ConsumerRT);
+ statsBenchmarkConsumer.getStore2ConsumerTotalRT().add(store2ConsumerRT);
compareAndSetMax(statsBenchmarkConsumer.getBorn2ConsumerMaxRT(), born2ConsumerRT);
compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT);
if (ThreadLocalRandom.current().nextDouble() < failRate) {
- statsBenchmarkConsumer.getFailCount().incrementAndGet();
+ statsBenchmarkConsumer.getFailCount().increment();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
@@ -233,39 +234,39 @@ public class Consumer {
}
class StatsBenchmarkConsumer {
- private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L);
+ private final LongAdder receiveMessageTotalCount = new LongAdder();
- private final AtomicLong born2ConsumerTotalRT = new AtomicLong(0L);
+ private final LongAdder born2ConsumerTotalRT = new LongAdder();
- private final AtomicLong store2ConsumerTotalRT = new AtomicLong(0L);
+ private final LongAdder store2ConsumerTotalRT = new LongAdder();
private final AtomicLong born2ConsumerMaxRT = new AtomicLong(0L);
private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L);
- private final AtomicLong failCount = new AtomicLong(0L);
+ private final LongAdder failCount = new LongAdder();
public Long[] createSnapshot() {
Long[] snap = new Long[] {
System.currentTimeMillis(),
- this.receiveMessageTotalCount.get(),
- this.born2ConsumerTotalRT.get(),
- this.store2ConsumerTotalRT.get(),
- this.failCount.get()
+ this.receiveMessageTotalCount.longValue(),
+ this.born2ConsumerTotalRT.longValue(),
+ this.store2ConsumerTotalRT.longValue(),
+ this.failCount.longValue()
};
return snap;
}
- public AtomicLong getReceiveMessageTotalCount() {
+ public LongAdder getReceiveMessageTotalCount() {
return receiveMessageTotalCount;
}
- public AtomicLong getBorn2ConsumerTotalRT() {
+ public LongAdder getBorn2ConsumerTotalRT() {
return born2ConsumerTotalRT;
}
- public AtomicLong getStore2ConsumerTotalRT() {
+ public LongAdder getStore2ConsumerTotalRT() {
return store2ConsumerTotalRT;
}
@@ -277,7 +278,7 @@ class StatsBenchmarkConsumer {
return store2ConsumerMaxRT;
}
- public AtomicLong getFailCount() {
+ public LongAdder getFailCount() {
return failCount;
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index b198a0f..cc29994 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.example.benchmark;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -156,8 +157,7 @@ public class Producer {
msg.setDelayTimeLevel(delayLevel);
}
if (tagCount > 0) {
- long sendSucCount = statsBenchmark.getReceiveResponseSuccessCount().get();
- msg.setTags(String.format("tag%d", sendSucCount % tagCount));
+ msg.setTags(String.format("tag%d", System.currentTimeMillis() % tagCount));
}
if (propertySize > 0) {
if (msg.getProperties() != null) {
@@ -180,20 +180,20 @@ public class Producer {
}
}
producer.send(msg);
- statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
- statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
+ statsBenchmark.getSendRequestSuccessCount().increment();
+ statsBenchmark.getReceiveResponseSuccessCount().increment();
final long currentRT = System.currentTimeMillis() - beginTimestamp;
- statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
- long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+ statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
+ long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
while (currentRT > prevMaxRT) {
boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
if (updated)
break;
- prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+ prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
}
} catch (RemotingException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+ statsBenchmark.getSendRequestFailedCount().increment();
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
@@ -201,16 +201,16 @@ public class Producer {
} catch (InterruptedException ignored) {
}
} catch (InterruptedException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+ statsBenchmark.getSendRequestFailedCount().increment();
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
}
} catch (MQClientException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+ statsBenchmark.getSendRequestFailedCount().increment();
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
} catch (MQBrokerException e) {
- statsBenchmark.getReceiveResponseFailedCount().incrementAndGet();
+ statsBenchmark.getReceiveResponseFailedCount().increment();
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
Thread.sleep(3000);
@@ -237,8 +237,8 @@ public class Producer {
doPrintStats(snapshotList, statsBenchmark, true);
} else {
System.out.printf("[Complete] Send Total: %d Send Failed: %d Response Failed: %d%n",
- statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(),
- statsBenchmark.getSendRequestFailedCount().get(), statsBenchmark.getReceiveResponseFailedCount().get());
+ statsBenchmark.getSendRequestSuccessCount().longValue() + statsBenchmark.getSendRequestFailedCount().longValue(),
+ statsBenchmark.getSendRequestFailedCount().longValue(), statsBenchmark.getReceiveResponseFailedCount().longValue());
}
producer.shutdown();
} catch (InterruptedException e) {
@@ -294,7 +294,7 @@ public class Producer {
Message msg = new Message();
msg.setTopic(topic);
- StringBuilder sb = new StringBuilder();
+ StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i += 10) {
sb.append("hello baby");
}
@@ -313,58 +313,58 @@ public class Producer {
if (done) {
System.out.printf("[Complete] Send Total: %d Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
- statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(),
- sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
+ statsBenchmark.getSendRequestSuccessCount().longValue() + statsBenchmark.getSendRequestFailedCount().longValue(),
+ sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
} else {
System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
- System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
+ System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
}
}
}
class StatsBenchmarkProducer {
- private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
+ private final LongAdder sendRequestSuccessCount = new LongAdder();
- private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
+ private final LongAdder sendRequestFailedCount = new LongAdder();
- private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L);
+ private final LongAdder receiveResponseSuccessCount = new LongAdder();
- private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L);
+ private final LongAdder receiveResponseFailedCount = new LongAdder();
- private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
+ private final LongAdder sendMessageSuccessTimeTotal = new LongAdder();
private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
public Long[] createSnapshot() {
Long[] snap = new Long[] {
System.currentTimeMillis(),
- this.sendRequestSuccessCount.get(),
- this.sendRequestFailedCount.get(),
- this.receiveResponseSuccessCount.get(),
- this.receiveResponseFailedCount.get(),
- this.sendMessageSuccessTimeTotal.get(),
+ this.sendRequestSuccessCount.longValue(),
+ this.sendRequestFailedCount.longValue(),
+ this.receiveResponseSuccessCount.longValue(),
+ this.receiveResponseFailedCount.longValue(),
+ this.sendMessageSuccessTimeTotal.longValue(),
};
return snap;
}
- public AtomicLong getSendRequestSuccessCount() {
+ public LongAdder getSendRequestSuccessCount() {
return sendRequestSuccessCount;
}
- public AtomicLong getSendRequestFailedCount() {
+ public LongAdder getSendRequestFailedCount() {
return sendRequestFailedCount;
}
- public AtomicLong getReceiveResponseSuccessCount() {
+ public LongAdder getReceiveResponseSuccessCount() {
return receiveResponseSuccessCount;
}
- public AtomicLong getReceiveResponseFailedCount() {
+ public LongAdder getReceiveResponseFailedCount() {
return receiveResponseFailedCount;
}
- public AtomicLong getSendMessageSuccessTimeTotal() {
+ public LongAdder getSendMessageSuccessTimeTotal() {
return sendMessageSuccessTimeTotal;
}