You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/09 16:39:33 UTC
[pulsar] branch master updated: Support performance tool to print
aggregated throughput. (#4245)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new db66df2 Support performance tool to print aggregated throughput. (#4245)
db66df2 is described below
commit db66df2dd835d736ea3556e7585747ed8b665835
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Fri May 10 00:39:28 2019 +0800
Support performance tool to print aggregated throughput. (#4245)
---
.../pulsar/testclient/PerformanceConsumer.java | 20 ++++++++++++++++++++
.../pulsar/testclient/PerformanceProducer.java | 21 +++++++++++++++++++++
2 files changed, 41 insertions(+)
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 7346922..98b3735 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -59,6 +59,9 @@ public class PerformanceConsumer {
private static final LongAdder bytesReceived = new LongAdder();
private static final DecimalFormat dec = new DecimalFormat("0.000");
+ private static final LongAdder totalMessagesReceived = new LongAdder();
+ private static final LongAdder totalBytesReceived = new LongAdder();
+
private static Recorder recorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
private static Recorder cumulativeRecorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
@@ -195,6 +198,9 @@ public class PerformanceConsumer {
messagesReceived.increment();
bytesReceived.add(msg.getData().length);
+ totalMessagesReceived.increment();
+ totalBytesReceived.add(msg.getData().length);
+
if (limiter != null) {
limiter.acquire();
}
@@ -280,8 +286,11 @@ public class PerformanceConsumer {
log.info("Start receiving from {} consumers on {} topics", arguments.numConsumers,
arguments.numTopics);
+ long start = System.nanoTime();
+
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
+ printAggregatedThroughput(start);
printAggregatedStats();
}
});
@@ -320,6 +329,17 @@ public class PerformanceConsumer {
pulsarClient.close();
}
+ private static void printAggregatedThroughput(long start) {
+ double elapsed = (System.nanoTime() - start) / 1e9;;
+ double rate = totalMessagesReceived.sum() / elapsed;
+ double throughput = totalBytesReceived.sum() / elapsed * 8 / 1024 / 1024;
+ log.info(
+ "Aggregated throughput stats --- {} records received --- {} msg/s --- {} Mbit/s",
+ totalMessagesReceived,
+ dec.format(rate),
+ dec.format(throughput));
+ }
+
private static void printAggregatedStats() {
Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 3d2536e..779d580 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -72,6 +72,9 @@ public class PerformanceProducer {
private static final LongAdder messagesSent = new LongAdder();
private static final LongAdder bytesSent = new LongAdder();
+ private static final LongAdder totalMessagesSent = new LongAdder();
+ private static final LongAdder totalBytesSent = new LongAdder();
+
private static Recorder recorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
private static Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
@@ -305,8 +308,11 @@ public class PerformanceProducer {
log.info("Created {} producers", producers.size());
+ long start = System.nanoTime();
+
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
+ printAggregatedThroughput(start);
printAggregatedStats();
}
});
@@ -353,6 +359,9 @@ public class PerformanceProducer {
messagesSent.increment();
bytesSent.add(payloadData.length);
+ totalMessagesSent.increment();
+ totalBytesSent.add(payloadData.length);
+
long now = System.nanoTime();
if (now > warmupEndTime) {
long latencyMicros = NANOSECONDS.toMicros(now - sendTime);
@@ -425,6 +434,17 @@ public class PerformanceProducer {
client.close();
}
+ private static void printAggregatedThroughput(long start) {
+ double elapsed = (System.nanoTime() - start) / 1e9;;
+ double rate = totalMessagesSent.sum() / elapsed;
+ double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+ log.info(
+ "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s",
+ totalMessagesSent,
+ totalFormat.format(rate),
+ totalFormat.format(throughput));
+ }
+
private static void printAggregatedStats() {
Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
@@ -442,5 +462,6 @@ public class PerformanceProducer {
static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+ static final DecimalFormat totalFormat = new DecimalFormat("0.000");
private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);
}