You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/29 10:40:34 UTC

[pulsar] 04/08: [testclient] Call printAggregatedStats method before client exit (#11985)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 519dbe1eacd09ab98825920bbe5f7c6cf27e2c1a
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Mon Sep 20 20:34:06 2021 +0800

    [testclient] Call printAggregatedStats method before client exit (#11985)
    
    
    (cherry picked from commit 3c770a18a807498634124161f95bc4f0888d5315)
---
 .../proxy/socket/client/PerformanceClient.java     | 32 ++++++++++++++++++++++
 .../pulsar/testclient/ManagedLedgerWriter.java     | 26 ++++++++++++++----
 2 files changed, 53 insertions(+), 5 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 3902d5f..0e6f84b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -67,6 +67,8 @@ public class PerformanceClient {
     static AtomicInteger msgSent = new AtomicInteger(0);
     private static final LongAdder messagesSent = new LongAdder();
     private static final LongAdder bytesSent = new LongAdder();
+    private static final LongAdder totalMessagesSent = new LongAdder();
+    private static final LongAdder totalBytesSent = new LongAdder();
     private JCommander jc;
 
     @Parameters(commandDescription = "Test pulsar websocket producer performance.")
@@ -263,6 +265,8 @@ public class PerformanceClient {
                         producersMap.get(topic).getSocket().sendMsg(String.valueOf(totalSent++), sizeOfMessage);
                         messagesSent.increment();
                         bytesSent.add(sizeOfMessage);
+                        totalMessagesSent.increment();
+                        totalBytesSent.add(sizeOfMessage);
                     }
                 }
 
@@ -328,6 +332,11 @@ public class PerformanceClient {
         PerformanceClient test = new PerformanceClient();
         Arguments arguments = test.loadArguments(args);
         PerfClientUtils.printJVMInformation(log);
+        long start = System.nanoTime();
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            printAggregatedThroughput(start);
+            printAggregatedStats();
+        }));
         test.runPerformanceTest(arguments.numMessages, arguments.msgRate, arguments.numTopics, arguments.msgSize,
                 arguments.proxyURL, arguments.topics.get(0), arguments.authPluginClassName, arguments.authParams);
     }
@@ -350,8 +359,31 @@ public class PerformanceClient {
 
     }
 
+    private static void printAggregatedThroughput(long start) {
+        double elapsed = (System.nanoTime() - start) / 1e9;
+        double rate = totalMessagesSent.sum() / elapsed;
+        double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+        log.info(
+                "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s",
+                totalMessagesSent,
+                totalFormat.format(rate),
+                totalFormat.format(throughput));
+    }
+
+    private static void printAggregatedStats() {
+        Histogram reportHistogram = SimpleTestProducerSocket.recorder.getIntervalHistogram();
+
+        log.info(
+                "Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+                dec.format(reportHistogram.getMean()), reportHistogram.getValueAtPercentile(50),
+                reportHistogram.getValueAtPercentile(95), reportHistogram.getValueAtPercentile(99),
+                reportHistogram.getValueAtPercentile(99.9), reportHistogram.getValueAtPercentile(99.99),
+                reportHistogram.getValueAtPercentile(99.999), reportHistogram.getMaxValue());
+    }
+
     static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+    static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = LoggerFactory.getLogger(PerformanceClient.class);
 
 }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 8d4c8c2..244be81 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -72,6 +72,8 @@ public class ManagedLedgerWriter {
 
     private static final LongAdder messagesSent = new LongAdder();
     private static final LongAdder bytesSent = new LongAdder();
+    private static final LongAdder totalMessagesSent = new LongAdder();
+    private static final LongAdder totalBytesSent = new LongAdder();
 
     private static Recorder recorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
     private static Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
@@ -211,11 +213,11 @@ public class ManagedLedgerWriter {
 
         log.info("Created {} managed ledgers", managedLedgers.size());
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                printAggregatedStats();
-            }
-        });
+        long start = System.nanoTime();
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            printAggregatedThroughput(start);
+            printAggregatedStats();
+        }));
 
         Collections.shuffle(managedLedgers);
         AtomicBoolean isDone = new AtomicBoolean();
@@ -245,6 +247,8 @@ public class ManagedLedgerWriter {
                             long sendTime = (Long) (ctx);
                             messagesSent.increment();
                             bytesSent.add(payloadData.length);
+                            totalMessagesSent.increment();
+                            totalBytesSent.add(payloadData.length);
 
                             long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - sendTime);
                             recorder.recordValue(latencyMicros);
@@ -376,6 +380,17 @@ public class ManagedLedgerWriter {
         return map;
     }
 
+    private static void printAggregatedThroughput(long start) {
+        double elapsed = (System.nanoTime() - start) / 1e9;
+        double rate = totalMessagesSent.sum() / elapsed;
+        double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+        log.info(
+                "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s",
+                totalMessagesSent,
+                totalFormat.format(rate),
+                totalFormat.format(throughput));
+    }
+
     private static void printAggregatedStats() {
         Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
 
@@ -393,5 +408,6 @@ public class ManagedLedgerWriter {
 
     static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+    static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerWriter.class);
 }