You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/11 00:23:08 UTC

[pulsar] 01/08: pulsar-perf: print total number of messages (#10765)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 263df0248ec339b92a3184f2efae4da79828c5c3
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Tue Jun 8 10:28:04 2021 +0200

    pulsar-perf: print total number of messages (#10765)
    
    
    (cherry picked from commit d93b775fb61af7c84d6960918bf9fa95b9b1d245)
---
 .../main/java/org/apache/pulsar/testclient/PerformanceConsumer.java | 6 +++++-
 .../main/java/org/apache/pulsar/testclient/PerformanceProducer.java | 6 ++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 12cc29a..c755cc0 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.JvmMetrics;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +61,7 @@ import com.google.common.util.concurrent.RateLimiter;
 public class PerformanceConsumer {
     private static final LongAdder messagesReceived = new LongAdder();
     private static final LongAdder bytesReceived = new LongAdder();
+    private static final DecimalFormat intFormat = new PaddingDecimalFormat("0", 7);
     private static final DecimalFormat dec = new DecimalFormat("0.000");
 
     private static final LongAdder totalMessagesReceived = new LongAdder();
@@ -401,13 +403,15 @@ public class PerformanceConsumer {
 
             long now = System.nanoTime();
             double elapsed = (now - oldTime) / 1e9;
+            long total = totalMessagesReceived.sum();
             double rate = messagesReceived.sumThenReset() / elapsed;
             double throughput = bytesReceived.sumThenReset() / elapsed * 8 / 1024 / 1024;
 
             reportHistogram = recorder.getIntervalHistogram(reportHistogram);
 
             log.info(
-                    "Throughput received: {}  msg/s -- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    "Throughput received: {} msg --- {}  msg/s -- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    intFormat.format(total),
                     dec.format(rate), dec.format(throughput), dec.format(reportHistogram.getMean()),
                     reportHistogram.getValueAtPercentile(50), reportHistogram.getValueAtPercentile(95),
                     reportHistogram.getValueAtPercentile(99), reportHistogram.getValueAtPercentile(99.9),
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 846bc96..f198a2d 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -453,7 +453,7 @@ public class PerformanceProducer {
 
             long now = System.nanoTime();
             double elapsed = (now - oldTime) / 1e9;
-
+            long total = totalMessagesSent.sum();
             double rate = messagesSent.sumThenReset() / elapsed;
             double failureRate = messagesFailed.sumThenReset() / elapsed;
             double throughput = bytesSent.sumThenReset() / elapsed / 1024 / 1024 * 8;
@@ -461,7 +461,8 @@ public class PerformanceProducer {
             reportHistogram = recorder.getIntervalHistogram(reportHistogram);
 
             log.info(
-                    "Throughput produced: {}  msg/s --- {} Mbit/s --- failure {} msg/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    "Throughput produced: {} msg --- {} msg/s --- {} Mbit/s --- failure {} msg/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    intFormat.format(total),
                     throughputFormat.format(rate), throughputFormat.format(throughput),
                     throughputFormat.format(failureRate),
                     dec.format(reportHistogram.getMean() / 1000.0),
@@ -714,6 +715,7 @@ public class PerformanceProducer {
 
     static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+    static final DecimalFormat intFormat = new PaddingDecimalFormat("0", 7);
     static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);