You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/11/09 11:33:47 UTC
[rocketmq] branch develop updated: [ISSUE #4487]Optimize the output of pressure results (#5282)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f4ff22564 [ISSUE #4487]Optimize the output of pressure results (#5282)
f4ff22564 is described below
commit f4ff225649cd3a106a0e3da408355a3f0676d194
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Wed Nov 9 19:33:41 2022 +0800
[ISSUE #4487]Optimize the output of pressure results (#5282)
Co-authored-by: zhangjidi <zh...@cmss.chinamobile.com>
---
.../rocketmq/example/benchmark/BatchProducer.java | 9 ++---
.../rocketmq/example/benchmark/Consumer.java | 8 ++---
.../rocketmq/example/benchmark/Producer.java | 8 ++---
.../example/benchmark/TransactionProducer.java | 39 +++++++++++-----------
4 files changed, 33 insertions(+), 31 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index 098dc11bb..23e922766 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
@@ -72,8 +73,8 @@ public class BatchProducer {
final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
final boolean aclEnable = getOptionValue(commandLine, 'a', false);
- System.out.printf("topic: %s threadCount: %d messageSize: %d batchSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n",
- topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);
+ System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, aclEnable: %s%n",
+ topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
@@ -386,8 +387,8 @@ class StatsBenchmarkBatchProducer {
final double averageRT = (end[5] - begin[5]) / (double) (end[1] - begin[1]);
final double averageMsgRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
- System.out.printf("Current Time: %s Send TPS: %d Send MPS: %d Max RT(ms): %d Average RT(ms): %7.3f Average Message RT(ms): %7.3f Send Failed: %d Send Message Failed: %d%n",
- System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().longValue(), averageRT, averageMsgRT, end[2], end[4]);
+ System.out.printf("Current Time: %s | Send TPS: %d | Send MPS: %d | Max RT(ms): %d | Average RT(ms): %7.3f | Average Message RT(ms): %7.3f | Send Failed: %d | Send Message Failed: %d%n",
+ UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), sendTps, sendMps, getSendMessageMaxRT().longValue(), averageRT, averageMsgRT, end[2], end[4]);
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 23a272bfc..87388edc9 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
@@ -79,7 +80,7 @@ public class Consumer {
final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
- new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
+ new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Long[]> snapshotList = new LinkedList<>();
@@ -110,9 +111,8 @@ public class Consumer {
statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);
- System.out.printf("Current Time: %s TPS: %d FAIL: %d AVG(B2C) RT(ms): %7.3f AVG(S2C) RT(ms): %7.3f MAX(B2C) RT(ms): %d MAX(S2C) RT(ms): %d%n",
- System.currentTimeMillis(), consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
- );
+ System.out.printf("Current Time: %s | Consume TPS: %d | AVG(B2C) RT(ms): %7.3f | AVG(S2C) RT(ms): %7.3f | MAX(B2C) RT(ms): %d | MAX(S2C) RT(ms): %d | Consume Fail: %d%n",
+ UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), consumeTps, averageB2CRT, averageS2CRT, b2cMax, s2cMax, failCount);
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 7112c89a1..24266a7b1 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -79,8 +79,8 @@ public class Producer {
final boolean asyncEnable = commandLine.hasOption('y') && Boolean.parseBoolean(commandLine.getOptionValue('y'));
final int threadCount = asyncEnable ? 1 : commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
- System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d " +
- "traceEnable: %s aclEnable: %s messageQuantity: %d%ndelayEnable: %s delayLevel: %s%n" +
+ System.out.printf("topic: %s, threadCount: %d, messageSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, " +
+ "traceEnable: %s, aclEnable: %s, messageQuantity: %d, delayEnable: %s, delayLevel: %s, " +
"asyncEnable: %s%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum,
delayEnable, delayLevel, asyncEnable);
@@ -357,11 +357,11 @@ public class Producer {
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
if (done) {
- System.out.printf("[Complete] Send Total: %d Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
+ System.out.printf("[Complete] Send Total: %d | Send TPS: %d | Max RT(ms): %d | Average RT(ms): %7.3f | Send Failed: %d | Response Failed: %d%n",
statsBenchmark.getSendRequestSuccessCount().longValue() + statsBenchmark.getSendRequestFailedCount().longValue(),
sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
} else {
- System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
+ System.out.printf("Current Time: %s | Send TPS: %d | Max RT(ms): %d | Average RT(ms): %7.3f | Send Failed: %d | Response Failed: %d%n",
UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 993224e3d..ebe3e01fd 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
@@ -84,7 +85,7 @@ public class TransactionProducer {
final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer();
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
- new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
+ new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Snapshot> snapshotList = new LinkedList<>();
@@ -105,7 +106,7 @@ public class TransactionProducer {
Snapshot end = snapshotList.getLast();
final long sendCount = (end.sendRequestSuccessCount - begin.sendRequestSuccessCount)
- + (end.sendRequestFailedCount - begin.sendRequestFailedCount);
+ + (end.sendRequestFailedCount - begin.sendRequestFailedCount);
final long sendTps = (sendCount * 1000L) / (end.endTime - begin.endTime);
final double averageRT = (end.sendMessageTimeTotal - begin.sendMessageTimeTotal) / (double) (end.sendRequestSuccessCount - begin.sendRequestSuccessCount);
@@ -115,9 +116,9 @@ public class TransactionProducer {
final long dupCheck = end.duplicatedCheck - begin.duplicatedCheck;
System.out.printf(
- "Current Time: %s Send TPS:%5d Max RT(ms):%5d AVG RT(ms):%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n",
- System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
- unexpectedCheck, dupCheck);
+ "Current Time: %s | Send TPS: %5d | Max RT(ms): %5d | AVG RT(ms): %3.1f | Send Failed: %d | Check: %d | UnexpectedCheck: %d | DuplicatedCheck: %d%n",
+ UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
+ unexpectedCheck, dupCheck);
statsBenchmark.getSendMessageMaxRT().set(0);
}
}
@@ -140,11 +141,11 @@ public class TransactionProducer {
}
final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config);
final TransactionMQProducer producer = new TransactionMQProducer(
- null,
- "benchmark_transaction_producer",
- rpcHook,
- config.msgTraceEnable,
- null);
+ null,
+ "benchmark_transaction_producer",
+ rpcHook,
+ config.msgTraceEnable,
+ null);
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000);
@@ -163,7 +164,7 @@ public class TransactionProducer {
final long beginTimestamp = System.currentTimeMillis();
try {
SendResult sendResult =
- producer.sendMessageInTransaction(buildMessage(config), null);
+ producer.sendMessageInTransaction(buildMessage(config), null);
success = sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK;
} catch (Throwable e) {
success = false;
@@ -173,7 +174,7 @@ public class TransactionProducer {
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
while (currentRT > prevMaxRT) {
boolean updated = statsBenchmark.getSendMessageMaxRT()
- .compareAndSet(prevMaxRT, currentRT);
+ .compareAndSet(prevMaxRT, currentRT);
if (updated)
break;
@@ -364,10 +365,10 @@ class TransactionListenerImpl implements TransactionListener {
}
if (msgMeta.sendResult != LocalTransactionState.UNKNOW) {
System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult=%s\n",
- new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
- msg.getMsgId(), msg.getTransactionId(),
- msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES),
- msgMeta.sendResult.toString());
+ new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
+ msg.getMsgId(), msg.getTransactionId(),
+ msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES),
+ msgMeta.sendResult.toString());
statBenchmark.getUnexpectedCheckCount().increment();
return msgMeta.sendResult;
}
@@ -376,9 +377,9 @@ class TransactionListenerImpl implements TransactionListener {
LocalTransactionState s = msgMeta.checkResult.get(i);
if (s != LocalTransactionState.UNKNOW) {
System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult,lastCheckResult=%s\n",
- new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
- msg.getMsgId(), msg.getTransactionId(),
- msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), s);
+ new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
+ msg.getMsgId(), msg.getTransactionId(),
+ msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), s);
statBenchmark.getUnexpectedCheckCount().increment();
return s;
}