You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/09 16:39:33 UTC

[pulsar] branch master updated: Support performance tool to print aggregated throughput. (#4245)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new db66df2  Support performance tool to print aggregated throughput. (#4245)
db66df2 is described below

commit db66df2dd835d736ea3556e7585747ed8b665835
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Fri May 10 00:39:28 2019 +0800

    Support performance tool to print aggregated throughput. (#4245)
---
 .../pulsar/testclient/PerformanceConsumer.java      | 20 ++++++++++++++++++++
 .../pulsar/testclient/PerformanceProducer.java      | 21 +++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 7346922..98b3735 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -59,6 +59,9 @@ public class PerformanceConsumer {
     private static final LongAdder bytesReceived = new LongAdder();
     private static final DecimalFormat dec = new DecimalFormat("0.000");
 
+    private static final LongAdder totalMessagesReceived = new LongAdder();
+    private static final LongAdder totalBytesReceived = new LongAdder();
+
     private static Recorder recorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
     private static Recorder cumulativeRecorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
 
@@ -195,6 +198,9 @@ public class PerformanceConsumer {
             messagesReceived.increment();
             bytesReceived.add(msg.getData().length);
 
+            totalMessagesReceived.increment();
+            totalBytesReceived.add(msg.getData().length);
+
             if (limiter != null) {
                 limiter.acquire();
             }
@@ -280,8 +286,11 @@ public class PerformanceConsumer {
         log.info("Start receiving from {} consumers on {} topics", arguments.numConsumers,
                 arguments.numTopics);
 
+        long start = System.nanoTime();
+
         Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {
+                printAggregatedThroughput(start);
                 printAggregatedStats();
             }
         });
@@ -320,6 +329,17 @@ public class PerformanceConsumer {
         pulsarClient.close();
     }
 
+    private static void printAggregatedThroughput(long start) {
+        double elapsed = (System.nanoTime() - start) / 1e9;;
+        double rate = totalMessagesReceived.sum() / elapsed;
+        double throughput = totalBytesReceived.sum() / elapsed * 8 / 1024 / 1024;
+        log.info(
+            "Aggregated throughput stats --- {} records received --- {} msg/s --- {} Mbit/s",
+            totalMessagesReceived,
+            dec.format(rate),
+            dec.format(throughput));
+    }
+
     private static void printAggregatedStats() {
         Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
 
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 3d2536e..779d580 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -72,6 +72,9 @@ public class PerformanceProducer {
     private static final LongAdder messagesSent = new LongAdder();
     private static final LongAdder bytesSent = new LongAdder();
 
+    private static final LongAdder totalMessagesSent = new LongAdder();
+    private static final LongAdder totalBytesSent = new LongAdder();
+
     private static Recorder recorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
     private static Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
 
@@ -305,8 +308,11 @@ public class PerformanceProducer {
 
         log.info("Created {} producers", producers.size());
 
+        long start = System.nanoTime();
+
         Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {
+                printAggregatedThroughput(start);
                 printAggregatedStats();
             }
         });
@@ -353,6 +359,9 @@ public class PerformanceProducer {
                             messagesSent.increment();
                             bytesSent.add(payloadData.length);
 
+                            totalMessagesSent.increment();
+                            totalBytesSent.add(payloadData.length);
+
                             long now = System.nanoTime();
                             if (now > warmupEndTime) {
                                 long latencyMicros = NANOSECONDS.toMicros(now - sendTime);
@@ -425,6 +434,17 @@ public class PerformanceProducer {
         client.close();
     }
 
+    private static void printAggregatedThroughput(long start) {
+        double elapsed = (System.nanoTime() - start) / 1e9;;
+        double rate = totalMessagesSent.sum() / elapsed;
+        double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+        log.info(
+            "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s",
+            totalMessagesSent,
+            totalFormat.format(rate),
+            totalFormat.format(throughput));
+    }
+
     private static void printAggregatedStats() {
         Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
 
@@ -442,5 +462,6 @@ public class PerformanceProducer {
 
     static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+    static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);
 }