You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/09/04 22:23:47 UTC
[rocketmq] branch develop updated: [ISSUE #3284]Optimizing
benchmark code (#3317)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 857d28d [ISSUE #3284]Optimizing benchmark code (#3317)
857d28d is described below
commit 857d28df5e6dfe697fd01d86bcc1a48c0bdef234
Author: 【keepal】 <97...@qq.com>
AuthorDate: Sun Sep 5 06:23:32 2021 +0800
[ISSUE #3284]Optimizing benchmark code (#3317)
---
.../rocketmq/example/benchmark/BatchProducer.java | 68 +++++++++++-----------
.../example/benchmark/TransactionProducer.java | 57 +++++++++---------
2 files changed, 65 insertions(+), 60 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index 843b84b..f3e8b60 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -26,6 +26,8 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -100,22 +102,22 @@ public class BatchProducer {
try {
long beginTimestamp = System.currentTimeMillis();
- long sendSucCount = statsBenchmark.getSendMessageSuccessCount().get();
+ long sendSucCount = statsBenchmark.getSendMessageSuccessCount().longValue();
setKeys(keyEnable, msgs, String.valueOf(beginTimestamp / 1000));
setTags(tagCount, msgs, sendSucCount);
setProperties(propertySize, msgs);
SendResult sendResult = producer.send(msgs);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
- statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
- statsBenchmark.getSendMessageSuccessCount().addAndGet(msgs.size());
+ statsBenchmark.getSendRequestSuccessCount().increment();
+ statsBenchmark.getSendMessageSuccessCount().add(msgs.size());
} else {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
- statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+ statsBenchmark.getSendRequestFailedCount().increment();
+ statsBenchmark.getSendMessageFailedCount().add(msgs.size());
}
long currentRT = System.currentTimeMillis() - beginTimestamp;
- statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
- long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+ statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
+ long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
while (currentRT > prevMaxRT) {
boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
if (updated) {
@@ -125,8 +127,8 @@ public class BatchProducer {
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
}
} catch (RemotingException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
- statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+ statsBenchmark.getSendRequestFailedCount().increment();
+ statsBenchmark.getSendMessageFailedCount().add(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
@@ -134,22 +136,22 @@ public class BatchProducer {
} catch (InterruptedException ignored) {
}
} catch (InterruptedException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
- statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+ statsBenchmark.getSendRequestFailedCount().increment();
+ statsBenchmark.getSendMessageFailedCount().add(msgs.size());
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
}
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
- statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+ statsBenchmark.getSendRequestFailedCount().increment();
+ statsBenchmark.getSendMessageFailedCount().add(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
} catch (MQClientException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
- statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+ statsBenchmark.getSendRequestFailedCount().increment();
+ statsBenchmark.getSendMessageFailedCount().add(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
} catch (MQBrokerException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
- statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
+ statsBenchmark.getSendRequestFailedCount().increment();
+ statsBenchmark.getSendMessageFailedCount().add(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
Thread.sleep(3000);
@@ -313,17 +315,17 @@ public class BatchProducer {
class StatsBenchmarkBatchProducer {
- private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
+ private final LongAdder sendRequestSuccessCount = new LongAdder();
- private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
+ private final LongAdder sendRequestFailedCount = new LongAdder();
- private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
+ private final LongAdder sendMessageSuccessTimeTotal = new LongAdder();
private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
- private final AtomicLong sendMessageSuccessCount = new AtomicLong(0L);
+ private final LongAdder sendMessageSuccessCount = new LongAdder();
- private final AtomicLong sendMessageFailedCount = new AtomicLong(0L);
+ private final LongAdder sendMessageFailedCount = new LongAdder();
private final Timer timer = new Timer("BenchmarkTimerThread", true);
@@ -332,25 +334,25 @@ class StatsBenchmarkBatchProducer {
public Long[] createSnapshot() {
Long[] snap = new Long[] {
System.currentTimeMillis(),
- this.sendRequestSuccessCount.get(),
- this.sendRequestFailedCount.get(),
- this.sendMessageSuccessCount.get(),
- this.sendMessageFailedCount.get(),
- this.sendMessageSuccessTimeTotal.get(),
+ this.sendRequestSuccessCount.longValue(),
+ this.sendRequestFailedCount.longValue(),
+ this.sendMessageSuccessCount.longValue(),
+ this.sendMessageFailedCount.longValue(),
+ this.sendMessageSuccessTimeTotal.longValue(),
};
return snap;
}
- public AtomicLong getSendRequestSuccessCount() {
+ public LongAdder getSendRequestSuccessCount() {
return sendRequestSuccessCount;
}
- public AtomicLong getSendRequestFailedCount() {
+ public LongAdder getSendRequestFailedCount() {
return sendRequestFailedCount;
}
- public AtomicLong getSendMessageSuccessTimeTotal() {
+ public LongAdder getSendMessageSuccessTimeTotal() {
return sendMessageSuccessTimeTotal;
}
@@ -358,11 +360,11 @@ class StatsBenchmarkBatchProducer {
return sendMessageMaxRT;
}
- public AtomicLong getSendMessageSuccessCount() {
+ public LongAdder getSendMessageSuccessCount() {
return sendMessageSuccessCount;
}
- public AtomicLong getSendMessageFailedCount() {
+ public LongAdder getSendMessageFailedCount() {
return sendMessageFailedCount;
}
@@ -390,7 +392,7 @@ class StatsBenchmarkBatchProducer {
final double averageMsgRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf("Current Time: %s Send TPS: %d Send MPS: %d Max RT(ms): %d Average RT(ms): %7.3f Average Message RT(ms): %7.3f Send Failed: %d Send Message Failed: %d%n",
- System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().get(), averageRT, averageMsgRT, end[2], end[4]);
+ System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().longValue(), averageRT, averageMsgRT, end[2], end[4]);
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index c4f14a4..767a96b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -50,10 +50,11 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
public class TransactionProducer {
private static final long START_TIME = System.currentTimeMillis();
- private static final AtomicLong MSG_COUNT = new AtomicLong(0);
+ private static final LongAdder MSG_COUNT = new LongAdder();
//broker max check times should less than this value
static final int MAX_CHECK_RESULT_IN_MSG = 20;
@@ -158,7 +159,7 @@ public class TransactionProducer {
success = false;
} finally {
final long currentRT = System.currentTimeMillis() - beginTimestamp;
- statsBenchmark.getSendMessageTimeTotal().addAndGet(currentRT);
+ statsBenchmark.getSendMessageTimeTotal().add(currentRT);
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
while (currentRT > prevMaxRT) {
boolean updated = statsBenchmark.getSendMessageMaxRT()
@@ -169,9 +170,9 @@ public class TransactionProducer {
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
}
if (success) {
- statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
+ statsBenchmark.getSendRequestSuccessCount().increment();
} else {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+ statsBenchmark.getSendRequestFailedCount().increment();
}
if (config.sendInterval > 0) {
try {
@@ -194,7 +195,9 @@ public class TransactionProducer {
ByteBuffer buf = ByteBuffer.wrap(bs);
buf.putLong(config.batchId);
long sendMachineId = START_TIME << 32;
- long msgId = sendMachineId | MSG_COUNT.getAndIncrement();
+ long count = MSG_COUNT.longValue();
+ long msgId = sendMachineId | count;
+ MSG_COUNT.increment();
buf.putLong(msgId);
// save send tx result in message
@@ -316,7 +319,7 @@ class TransactionListenerImpl implements TransactionListener {
// message not generated in this test
return LocalTransactionState.ROLLBACK_MESSAGE;
}
- statBenchmark.getCheckCount().incrementAndGet();
+ statBenchmark.getCheckCount().increment();
int times = 0;
try {
@@ -339,7 +342,7 @@ class TransactionListenerImpl implements TransactionListener {
dup = newCheckLog.equals(oldCheckLog);
}
if (dup) {
- statBenchmark.getDuplicatedCheckCount().incrementAndGet();
+ statBenchmark.getDuplicatedCheckCount().increment();
}
if (msgMeta.sendResult != LocalTransactionState.UNKNOW) {
System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult=%s\n",
@@ -347,7 +350,7 @@ class TransactionListenerImpl implements TransactionListener {
msg.getMsgId(), msg.getTransactionId(),
msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES),
msgMeta.sendResult.toString());
- statBenchmark.getUnexpectedCheckCount().incrementAndGet();
+ statBenchmark.getUnexpectedCheckCount().increment();
return msgMeta.sendResult;
}
@@ -358,7 +361,7 @@ class TransactionListenerImpl implements TransactionListener {
new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
msg.getMsgId(), msg.getTransactionId(),
msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), s);
- statBenchmark.getUnexpectedCheckCount().incrementAndGet();
+ statBenchmark.getUnexpectedCheckCount().increment();
return s;
}
}
@@ -385,42 +388,42 @@ class Snapshot {
}
class StatsBenchmarkTProducer {
- private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
+ private final LongAdder sendRequestSuccessCount = new LongAdder();
- private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
+ private final LongAdder sendRequestFailedCount = new LongAdder();
- private final AtomicLong sendMessageTimeTotal = new AtomicLong(0L);
+ private final LongAdder sendMessageTimeTotal = new LongAdder();
private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
- private final AtomicLong checkCount = new AtomicLong(0L);
+ private final LongAdder checkCount = new LongAdder();
- private final AtomicLong unexpectedCheckCount = new AtomicLong(0L);
+ private final LongAdder unexpectedCheckCount = new LongAdder();
- private final AtomicLong duplicatedCheckCount = new AtomicLong(0);
+ private final LongAdder duplicatedCheckCount = new LongAdder();
public Snapshot createSnapshot() {
Snapshot s = new Snapshot();
s.endTime = System.currentTimeMillis();
- s.sendRequestSuccessCount = sendRequestSuccessCount.get();
- s.sendRequestFailedCount = sendRequestFailedCount.get();
- s.sendMessageTimeTotal = sendMessageTimeTotal.get();
+ s.sendRequestSuccessCount = sendRequestSuccessCount.longValue();
+ s.sendRequestFailedCount = sendRequestFailedCount.longValue();
+ s.sendMessageTimeTotal = sendMessageTimeTotal.longValue();
s.sendMessageMaxRT = sendMessageMaxRT.get();
- s.checkCount = checkCount.get();
- s.unexpectedCheckCount = unexpectedCheckCount.get();
- s.duplicatedCheck = duplicatedCheckCount.get();
+ s.checkCount = checkCount.longValue();
+ s.unexpectedCheckCount = unexpectedCheckCount.longValue();
+ s.duplicatedCheck = duplicatedCheckCount.longValue();
return s;
}
- public AtomicLong getSendRequestSuccessCount() {
+ public LongAdder getSendRequestSuccessCount() {
return sendRequestSuccessCount;
}
- public AtomicLong getSendRequestFailedCount() {
+ public LongAdder getSendRequestFailedCount() {
return sendRequestFailedCount;
}
- public AtomicLong getSendMessageTimeTotal() {
+ public LongAdder getSendMessageTimeTotal() {
return sendMessageTimeTotal;
}
@@ -428,15 +431,15 @@ class StatsBenchmarkTProducer {
return sendMessageMaxRT;
}
- public AtomicLong getCheckCount() {
+ public LongAdder getCheckCount() {
return checkCount;
}
- public AtomicLong getUnexpectedCheckCount() {
+ public LongAdder getUnexpectedCheckCount() {
return unexpectedCheckCount;
}
- public AtomicLong getDuplicatedCheckCount() {
+ public LongAdder getDuplicatedCheckCount() {
return duplicatedCheckCount;
}
}