You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/09/21 23:46:45 UTC
[pulsar] branch master updated: make pulsar-perf ioThread number
configurable (#8090)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1abb2b4 make pulsar-perf ioThread number configurable (#8090)
1abb2b4 is described below
commit 1abb2b4f7d93792d45b41026488567f037eb78e7
Author: hangc0276 <ha...@163.com>
AuthorDate: Tue Sep 22 07:46:27 2020 +0800
make pulsar-perf ioThread number configurable (#8090)
### Motivation
In pulser-perf, the default pulsar client ioThread number is `Runtime.getRuntime().availableProcessors()` and can't be configured in commandline. When running pulsar-perf producer, it may cause message enqueue competition and lead to high latency.
### Changes
1. make ioThread number configurable in command line
2. change the default ioThead number from `Runtime.getRuntime().availableProcessors()` to `1`
---
.../java/org/apache/pulsar/testclient/PerformanceConsumer.java | 6 +++++-
.../java/org/apache/pulsar/testclient/PerformanceProducer.java | 8 ++++++--
.../main/java/org/apache/pulsar/testclient/PerformanceReader.java | 6 +++++-
3 files changed, 16 insertions(+), 4 deletions(-)
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index f118c12..552b146 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -158,6 +158,10 @@ public class PerformanceConsumer {
@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
public long testTime = 0;
+
+ @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+ "used for handling connections to brokers, default is 1 thread")
+ public int ioThreads = 1;
}
public static void main(String[] args) throws Exception {
@@ -260,7 +264,7 @@ public class PerformanceConsumer {
.serviceUrl(arguments.serviceURL) //
.connectionsPerBroker(arguments.maxConnections) //
.statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
- .ioThreads(Runtime.getRuntime().availableProcessors()) //
+ .ioThreads(arguments.ioThreads) //
.tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
if (isNotBlank(arguments.authPluginClassName)) {
clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 812cf0d..9153812 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -211,6 +211,10 @@ public class PerformanceProducer {
@Parameter(names = {"-mk", "--message-key-generation-mode"}, description = "The generation mode of message key" +
", valid options are: [autoIncrement, random]")
public String messageKeyGenerationMode = null;
+
+ @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+ "used for handling connections to brokers, default is 1 thread")
+ public int ioThreads = 1;
}
static class EncKeyReader implements CryptoKeyReader {
@@ -426,7 +430,7 @@ public class PerformanceProducer {
ClientBuilder clientBuilder = PulsarClient.builder() //
.serviceUrl(arguments.serviceURL) //
.connectionsPerBroker(arguments.maxConnections) //
- .ioThreads(Runtime.getRuntime().availableProcessors()) //
+ .ioThreads(arguments.ioThreads) //
.statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
.tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
@@ -628,6 +632,6 @@ public class PerformanceProducer {
private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);
public enum MessageKeyGenerationMode {
- autoIncrement,random;
+ autoIncrement,random
}
}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 694b667..0d21196 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -117,6 +117,10 @@ public class PerformanceReader {
@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
public long testTime = 0;
+
+ @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
+ "used for handling connections to brokers, default is 1 thread")
+ public int ioThreads = 1;
}
public static void main(String[] args) throws Exception {
@@ -211,7 +215,7 @@ public class PerformanceReader {
.serviceUrl(arguments.serviceURL) //
.connectionsPerBroker(arguments.maxConnections) //
.statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
- .ioThreads(Runtime.getRuntime().availableProcessors()) //
+ .ioThreads(arguments.ioThreads) //
.enableTls(arguments.useTls) //
.tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);