You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/29 10:40:34 UTC
[pulsar] 04/08: [testclient] Call printAggregatedStats method
before client exit (#11985)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 519dbe1eacd09ab98825920bbe5f7c6cf27e2c1a
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Mon Sep 20 20:34:06 2021 +0800
[testclient] Call printAggregatedStats method before client exit (#11985)
(cherry picked from commit 3c770a18a807498634124161f95bc4f0888d5315)
---
.../proxy/socket/client/PerformanceClient.java | 32 ++++++++++++++++++++++
.../pulsar/testclient/ManagedLedgerWriter.java | 26 ++++++++++++++----
2 files changed, 53 insertions(+), 5 deletions(-)
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 3902d5f..0e6f84b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -67,6 +67,8 @@ public class PerformanceClient {
static AtomicInteger msgSent = new AtomicInteger(0);
private static final LongAdder messagesSent = new LongAdder();
private static final LongAdder bytesSent = new LongAdder();
+ private static final LongAdder totalMessagesSent = new LongAdder();
+ private static final LongAdder totalBytesSent = new LongAdder();
private JCommander jc;
@Parameters(commandDescription = "Test pulsar websocket producer performance.")
@@ -263,6 +265,8 @@ public class PerformanceClient {
producersMap.get(topic).getSocket().sendMsg(String.valueOf(totalSent++), sizeOfMessage);
messagesSent.increment();
bytesSent.add(sizeOfMessage);
+ totalMessagesSent.increment();
+ totalBytesSent.add(sizeOfMessage);
}
}
@@ -328,6 +332,11 @@ public class PerformanceClient {
PerformanceClient test = new PerformanceClient();
Arguments arguments = test.loadArguments(args);
PerfClientUtils.printJVMInformation(log);
+ long start = System.nanoTime();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ printAggregatedThroughput(start);
+ printAggregatedStats();
+ }));
test.runPerformanceTest(arguments.numMessages, arguments.msgRate, arguments.numTopics, arguments.msgSize,
arguments.proxyURL, arguments.topics.get(0), arguments.authPluginClassName, arguments.authParams);
}
@@ -350,8 +359,31 @@ public class PerformanceClient {
}
+ private static void printAggregatedThroughput(long start) {
+ double elapsed = (System.nanoTime() - start) / 1e9;
+ double rate = totalMessagesSent.sum() / elapsed;
+ double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+ log.info(
+ "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s",
+ totalMessagesSent,
+ totalFormat.format(rate),
+ totalFormat.format(throughput));
+ }
+
+ private static void printAggregatedStats() {
+ Histogram reportHistogram = SimpleTestProducerSocket.recorder.getIntervalHistogram();
+
+ log.info(
+ "Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+ dec.format(reportHistogram.getMean()), reportHistogram.getValueAtPercentile(50),
+ reportHistogram.getValueAtPercentile(95), reportHistogram.getValueAtPercentile(99),
+ reportHistogram.getValueAtPercentile(99.9), reportHistogram.getValueAtPercentile(99.99),
+ reportHistogram.getValueAtPercentile(99.999), reportHistogram.getMaxValue());
+ }
+
static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+ static final DecimalFormat totalFormat = new DecimalFormat("0.000");
private static final Logger log = LoggerFactory.getLogger(PerformanceClient.class);
}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 8d4c8c2..244be81 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -72,6 +72,8 @@ public class ManagedLedgerWriter {
private static final LongAdder messagesSent = new LongAdder();
private static final LongAdder bytesSent = new LongAdder();
+ private static final LongAdder totalMessagesSent = new LongAdder();
+ private static final LongAdder totalBytesSent = new LongAdder();
private static Recorder recorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
private static Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
@@ -211,11 +213,11 @@ public class ManagedLedgerWriter {
log.info("Created {} managed ledgers", managedLedgers.size());
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- printAggregatedStats();
- }
- });
+ long start = System.nanoTime();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ printAggregatedThroughput(start);
+ printAggregatedStats();
+ }));
Collections.shuffle(managedLedgers);
AtomicBoolean isDone = new AtomicBoolean();
@@ -245,6 +247,8 @@ public class ManagedLedgerWriter {
long sendTime = (Long) (ctx);
messagesSent.increment();
bytesSent.add(payloadData.length);
+ totalMessagesSent.increment();
+ totalBytesSent.add(payloadData.length);
long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - sendTime);
recorder.recordValue(latencyMicros);
@@ -376,6 +380,17 @@ public class ManagedLedgerWriter {
return map;
}
+ private static void printAggregatedThroughput(long start) {
+ double elapsed = (System.nanoTime() - start) / 1e9;
+ double rate = totalMessagesSent.sum() / elapsed;
+ double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+ log.info(
+ "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s",
+ totalMessagesSent,
+ totalFormat.format(rate),
+ totalFormat.format(throughput));
+ }
+
private static void printAggregatedStats() {
Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
@@ -393,5 +408,6 @@ public class ManagedLedgerWriter {
static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+ static final DecimalFormat totalFormat = new DecimalFormat("0.000");
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerWriter.class);
}