You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/09/21 23:46:45 UTC

[pulsar] branch master updated: make pulsar-perf ioThread number configurable (#8090)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1abb2b4  make pulsar-perf ioThread number configurable (#8090)
1abb2b4 is described below

commit 1abb2b4f7d93792d45b41026488567f037eb78e7
Author: hangc0276 <ha...@163.com>
AuthorDate: Tue Sep 22 07:46:27 2020 +0800

    make pulsar-perf ioThread number configurable (#8090)
    
    ### Motivation
    In pulser-perf, the default pulsar client ioThread number is `Runtime.getRuntime().availableProcessors()` and can't be configured in commandline. When running pulsar-perf producer, it may cause message enqueue competition and lead to high latency.
    
    ### Changes
    1. make ioThread number configurable in command line
    2. change the default ioThead number from `Runtime.getRuntime().availableProcessors()` to `1`
---
 .../java/org/apache/pulsar/testclient/PerformanceConsumer.java    | 6 +++++-
 .../java/org/apache/pulsar/testclient/PerformanceProducer.java    | 8 ++++++--
 .../main/java/org/apache/pulsar/testclient/PerformanceReader.java | 6 +++++-
 3 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index f118c12..552b146 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -158,6 +158,10 @@ public class PerformanceConsumer {
         @Parameter(names = { "-time",
                 "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
         public long testTime = 0;
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
     }
 
     public static void main(String[] args) throws Exception {
@@ -260,7 +264,7 @@ public class PerformanceConsumer {
                 .serviceUrl(arguments.serviceURL) //
                 .connectionsPerBroker(arguments.maxConnections) //
                 .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
-                .ioThreads(Runtime.getRuntime().availableProcessors()) //
+                .ioThreads(arguments.ioThreads) //
                 .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
         if (isNotBlank(arguments.authPluginClassName)) {
             clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 812cf0d..9153812 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -211,6 +211,10 @@ public class PerformanceProducer {
         @Parameter(names = {"-mk", "--message-key-generation-mode"}, description = "The generation mode of message key" +
                 ", valid options are: [autoIncrement, random]")
         public String messageKeyGenerationMode = null;
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
     }
 
     static class EncKeyReader implements CryptoKeyReader {
@@ -426,7 +430,7 @@ public class PerformanceProducer {
             ClientBuilder clientBuilder = PulsarClient.builder() //
                     .serviceUrl(arguments.serviceURL) //
                     .connectionsPerBroker(arguments.maxConnections) //
-                    .ioThreads(Runtime.getRuntime().availableProcessors()) //
+                    .ioThreads(arguments.ioThreads) //
                     .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
                     .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
 
@@ -628,6 +632,6 @@ public class PerformanceProducer {
     private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);
 
     public enum MessageKeyGenerationMode {
-        autoIncrement,random;
+        autoIncrement,random
     }
 }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 694b667..0d21196 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -117,6 +117,10 @@ public class PerformanceReader {
         @Parameter(names = { "-time",
                 "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
         public long testTime = 0;
+
+        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+                "used for handling connections to brokers, default is 1 thread")
+        public int ioThreads = 1;
     }
 
     public static void main(String[] args) throws Exception {
@@ -211,7 +215,7 @@ public class PerformanceReader {
                 .serviceUrl(arguments.serviceURL) //
                 .connectionsPerBroker(arguments.maxConnections) //
                 .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
-                .ioThreads(Runtime.getRuntime().availableProcessors()) //
+                .ioThreads(arguments.ioThreads) //
                 .enableTls(arguments.useTls) //
                 .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);