You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/27 03:33:09 UTC

[pulsar] branch master updated: Add test time for perf consumer and perf reader (#7044)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f43fc6  Add test time for perf consumer and perf reader (#7044)
5f43fc6 is described below

commit 5f43fc6547b104dc367e8359cf5d4aad06670b0f
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed May 27 11:32:48 2020 +0800

    Add test time for perf consumer and perf reader (#7044)
---
 .../org/apache/pulsar/testclient/PerformanceConsumer.java  | 14 +++++++++++++-
 .../org/apache/pulsar/testclient/PerformanceReader.java    | 13 ++++++++++++-
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 443f3bd..b667eff 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -136,6 +136,10 @@ public class PerformanceConsumer {
         @Parameter(names = { "-v",
                 "--encryption-key-value-file" }, description = "The file which contains the private key to decrypt payload")
         public String encKeyFile = null;
+
+        @Parameter(names = { "-time",
+                "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
+        public long testTime = 0;
     }
 
     public static void main(String[] args) throws Exception {
@@ -200,8 +204,16 @@ public class PerformanceConsumer {
         final TopicName prefixTopicName = TopicName.get(arguments.topic.get(0));
 
         final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
-
+        long startTime = System.nanoTime();
+        long testEndTime = startTime + (long) (arguments.testTime * 1e9);
         MessageListener<byte[]> listener = (consumer, msg) -> {
+            if (arguments.testTime > 0) {
+                if (System.nanoTime() > testEndTime) {
+                    log.info("------------------- DONE -----------------------");
+                    printAggregatedStats();
+                    System.exit(0);
+                }
+            }
             messagesReceived.increment();
             bytesReceived.add(msg.getData().length);
 
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 73db7d7..9e5480e 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -106,6 +106,10 @@ public class PerformanceReader {
         @Parameter(names = {
                 "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
         public String tlsTrustCertsFilePath = "";
+
+        @Parameter(names = { "-time",
+                "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
+        public long testTime = 0;
     }
 
     public static void main(String[] args) throws Exception {
@@ -174,8 +178,15 @@ public class PerformanceReader {
         final TopicName prefixTopicName = TopicName.get(arguments.topic.get(0));
 
         final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
-
+        long startTime = System.nanoTime();
+        long testEndTime = startTime + (long) (arguments.testTime * 1e9);
         ReaderListener<byte[]> listener = (reader, msg) -> {
+            if (arguments.testTime > 0) {
+                if (System.nanoTime() > testEndTime) {
+                    log.info("------------------- DONE -----------------------");
+                    System.exit(0);
+                }
+            }
             messagesReceived.increment();
             bytesReceived.add(msg.getData().length);