You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/17 14:11:24 UTC
[pulsar] branch master updated: [improve][pulsar-perf] Transactions: improve client options (#16090)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6c7847c03bc [improve][pulsar-perf] Transactions: improve client options (#16090)
6c7847c03bc is described below
commit 6c7847c03bcedf85106e14314a8ae9e6e16ac772
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Jun 17 16:11:10 2022 +0200
[improve][pulsar-perf] Transactions: improve client options (#16090)
---
.../apache/pulsar/testclient/PerfClientUtils.java | 62 +++++++++
.../testclient/PerformanceBaseArguments.java | 135 +++++++++++++++++++
.../pulsar/testclient/PerformanceConsumer.java | 116 ++--------------
.../pulsar/testclient/PerformanceProducer.java | 149 ++++-----------------
.../pulsar/testclient/PerformanceReader.java | 120 ++---------------
.../pulsar/testclient/PerformanceTransaction.java | 105 +++------------
.../pulsar/testclient/PerfClientUtilsTest.java | 91 +++++++++++++
.../testclient/PerformanceBaseArgumentsTest.java | 52 +++++++
.../src/test/resources/perf_client1.conf | 25 ++++
site2/docs/reference-cli-tools.md | 60 +++------
10 files changed, 444 insertions(+), 471 deletions(-)
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index 44dcad90e3e..2b0c989bfbb 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -18,11 +18,19 @@
*/
package org.apache.pulsar.testclient;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.lang.management.ManagementFactory;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.experimental.UtilityClass;
import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.slf4j.Logger;
@@ -55,5 +63,59 @@ public class PerfClientUtils {
FileUtils.byteCountToDisplaySize(Runtime.getRuntime().maxMemory()));
}
+ public static ClientBuilder createClientBuilderFromArguments(PerformanceBaseArguments arguments)
+ throws PulsarClientException.UnsupportedAuthenticationException {
+
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .memoryLimit(0, SizeUnit.BYTES)
+ .serviceUrl(arguments.serviceURL)
+ .connectionsPerBroker(arguments.maxConnections)
+ .ioThreads(arguments.ioThreads)
+ .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS)
+ .enableBusyWait(arguments.enableBusyWait)
+ .listenerThreads(arguments.listenerThreads)
+ .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
+ if (isNotBlank(arguments.authPluginClassName)) {
+ clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
+ }
+
+ if (arguments.tlsAllowInsecureConnection != null) {
+ clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
+ }
+
+ if (arguments.tlsHostnameVerificationEnable != null) {
+ clientBuilder.enableTlsHostnameVerification(arguments.tlsHostnameVerificationEnable);
+ }
+
+ if (isNotBlank(arguments.listenerName)) {
+ clientBuilder.listenerName(arguments.listenerName);
+ }
+ return clientBuilder;
+ }
+
+ public static PulsarAdminBuilder createAdminBuilderFromArguments(PerformanceBaseArguments arguments,
+ final String adminUrl)
+ throws PulsarClientException.UnsupportedAuthenticationException {
+
+ PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder()
+ .serviceHttpUrl(adminUrl)
+ .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
+ if (isNotBlank(arguments.authPluginClassName)) {
+ pulsarAdminBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
+ }
+
+ if (arguments.tlsAllowInsecureConnection != null) {
+ pulsarAdminBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
+ }
+
+ if (arguments.tlsHostnameVerificationEnable != null) {
+ pulsarAdminBuilder.enableTlsHostnameVerification(arguments.tlsHostnameVerificationEnable);
+ }
+
+ return pulsarAdminBuilder;
+ }
+
}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
new file mode 100644
index 00000000000..9c66d71ca6e
--- /dev/null
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import com.beust.jcommander.Parameter;
+import java.io.FileInputStream;
+import java.util.Properties;
+import lombok.SneakyThrows;
+
+
+public abstract class PerformanceBaseArguments {
+
+ @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
+ boolean help;
+
+ @Parameter(names = { "-cf", "--conf-file" }, description = "Configuration file")
+ public String confFile;
+
+ @Parameter(names = { "-u", "--service-url" }, description = "Pulsar Service URL")
+ public String serviceURL;
+
+ @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
+ public String authPluginClassName;
+
+ @Parameter(
+ names = { "--auth-params" },
+ description = "Authentication parameters, whose format is determined by the implementation "
+ + "of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" "
+ + "or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
+ public String authParams;
+
+ @Parameter(names = {
+ "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
+ public String tlsTrustCertsFilePath = "";
+
+ @Parameter(names = {
+ "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
+ public Boolean tlsAllowInsecureConnection = null;
+
+ @Parameter(names = {
+ "--tls-hostname-verification" }, description = "Enable TLS hostname verification")
+ public Boolean tlsHostnameVerificationEnable = null;
+
+ @Parameter(names = { "-c",
+ "--max-connections" }, description = "Max number of TCP connections to a single broker")
+ public int maxConnections = 1;
+
+ @Parameter(names = { "-i",
+ "--stats-interval-seconds" },
+ description = "Statistics Interval Seconds. If 0, statistics will be disabled")
+ public long statsIntervalSeconds = 0;
+
+ @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be "
+ + "used for handling connections to brokers. The default value is 1.")
+ public int ioThreads = 1;
+
+ @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
+ public boolean enableBusyWait = false;
+
+ @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
+ public String listenerName = null;
+
+ @Parameter(names = {"-lt", "--num-listener-threads"}, description = "Set the number of threads"
+ + " to be used for message listeners")
+ public int listenerThreads = 1;
+
+ public abstract void fillArgumentsFromProperties(Properties prop);
+
+ @SneakyThrows
+ public void fillArgumentsFromProperties() {
+ if (confFile == null) {
+ return;
+ }
+
+ Properties prop = new Properties(System.getProperties());
+ try (FileInputStream fis = new FileInputStream(confFile)) {
+ prop.load(fis);
+ }
+
+ if (serviceURL == null) {
+ serviceURL = prop.getProperty("brokerServiceUrl");
+ }
+
+ if (serviceURL == null) {
+ serviceURL = prop.getProperty("webServiceUrl");
+ }
+
+ // fallback to previous-version serviceUrl property to maintain backward-compatibility
+ if (serviceURL == null) {
+ serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
+ }
+
+ if (authPluginClassName == null) {
+ authPluginClassName = prop.getProperty("authPlugin", null);
+ }
+
+ if (authParams == null) {
+ authParams = prop.getProperty("authParams", null);
+ }
+
+ if (isBlank(tlsTrustCertsFilePath)) {
+ tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
+ }
+
+ if (tlsAllowInsecureConnection == null) {
+ tlsAllowInsecureConnection = Boolean.parseBoolean(prop
+ .getProperty("tlsAllowInsecureConnection", ""));
+ }
+
+ if (tlsHostnameVerificationEnable == null) {
+ tlsHostnameVerificationEnable = Boolean.parseBoolean(prop
+ .getProperty("tlsEnableHostnameVerification", ""));
+
+ }
+ fillArgumentsFromProperties(prop);
+ }
+
+}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 6b145ca288a..3ac281db6fb 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -27,7 +27,6 @@ import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
@@ -51,7 +50,6 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
@@ -87,13 +85,7 @@ public class PerformanceConsumer {
private static final Recorder cumulativeRecorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
@Parameters(commandDescription = "Test pulsar consumer performance.")
- static class Arguments {
-
- @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
- boolean help;
-
- @Parameter(names = { "-cf", "--conf-file" }, description = "Configuration file")
- public String confFile;
+ static class Arguments extends PerformanceBaseArguments {
@Parameter(description = "persistent://prop/ns/my-topic", required = true)
public List<String> topic;
@@ -149,27 +141,9 @@ public class PerformanceConsumer {
description = "Number of messages to consume in total. If <= 0, it will keep consuming")
public long numMessages = 0;
- @Parameter(names = { "-c",
- "--max-connections" }, description = "Max number of TCP connections to a single broker")
- public int maxConnections = 1;
-
- @Parameter(names = { "-i",
- "--stats-interval-seconds" },
- description = "Statistics Interval Seconds. If 0, statistics will be disabled")
- public long statsIntervalSeconds = 0;
-
- @Parameter(names = { "-u", "--service-url" }, description = "Pulsar Service URL")
- public String serviceURL;
-
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name", hidden = true)
public String deprecatedAuthPluginClassName;
- @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
- public String authPluginClassName;
-
- @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
- String listenerName = null;
-
@Parameter(names = { "-mc", "--max_chunked_msg" }, description = "Max pending chunk messages")
private int maxPendingChunkedMessage = 0;
@@ -182,21 +156,6 @@ public class PerformanceConsumer {
description = "Expire time in ms for incomplete chunk messages")
private long expireTimeOfIncompleteChunkedMessageMs = 0;
- @Parameter(
- names = { "--auth-params" },
- description = "Authentication parameters, whose format is determined by the implementation "
- + "of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" "
- + "or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
- public String authParams;
-
- @Parameter(names = {
- "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
- public String tlsTrustCertsFilePath = "";
-
- @Parameter(names = {
- "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
- public Boolean tlsAllowInsecureConnection = null;
-
@Parameter(names = { "-v",
"--encryption-key-value-file" },
description = "The file which contains the private key to decrypt payload")
@@ -206,23 +165,12 @@ public class PerformanceConsumer {
"--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming")
public long testTime = 0;
- @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be "
- + "used for handling connections to brokers. The default value is 1.")
- public int ioThreads = 1;
-
- @Parameter(names = {"-lt", "--num-listener-threads"}, description = "Set the number of threads"
- + " to be used for message listeners")
- public int listenerThreads = 1;
-
@Parameter(names = {"--batch-index-ack" }, description = "Enable or disable the batch index acknowledgment")
public boolean batchIndexAck = false;
@Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message", arity = 1)
private boolean poolMessages = true;
- @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
- public boolean enableBusyWait = false;
-
@Parameter(names = {"-tto", "--txn-timeout"}, description = "Set the time value of transaction timeout,"
+ " and the time unit is second. (After --txn-enable setting to true, --txn-timeout takes effect)")
public long transactionTimeout = 10;
@@ -245,6 +193,10 @@ public class PerformanceConsumer {
@Parameter(names = { "--histogram-file" }, description = "HdrHistogram output file")
public String histogramFile = null;
+
+ @Override
+ public void fillArgumentsFromProperties(Properties prop) {
+ }
}
public static void main(String[] args) throws Exception {
@@ -308,41 +260,7 @@ public class PerformanceConsumer {
PerfClientUtils.exit(-1);
}
}
-
- if (arguments.confFile != null) {
- Properties prop = new Properties(System.getProperties());
- prop.load(new FileInputStream(arguments.confFile));
-
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("brokerServiceUrl");
- }
-
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("webServiceUrl");
- }
-
- // fallback to previous-version serviceUrl property to maintain backward-compatibility
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
- }
-
- if (arguments.authPluginClassName == null) {
- arguments.authPluginClassName = prop.getProperty("authPlugin", null);
- }
-
- if (arguments.authParams == null) {
- arguments.authParams = prop.getProperty("authParams", null);
- }
-
- if (isBlank(arguments.tlsTrustCertsFilePath)) {
- arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
- }
-
- if (arguments.tlsAllowInsecureConnection == null) {
- arguments.tlsAllowInsecureConnection = Boolean.parseBoolean(prop
- .getProperty("tlsAllowInsecureConnection", ""));
- }
- }
+ arguments.fillArgumentsFromProperties();
// Dump config variables
PerfClientUtils.printJVMInformation(log);
@@ -356,27 +274,9 @@ public class PerformanceConsumer {
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (arguments.testTime * 1e9);
- ClientBuilder clientBuilder = PulsarClient.builder() //
- .memoryLimit(0, SizeUnit.BYTES)
- .enableTransaction(arguments.isEnableTransaction)
- .serviceUrl(arguments.serviceURL) //
- .connectionsPerBroker(arguments.maxConnections) //
- .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
- .ioThreads(arguments.ioThreads) //
- .listenerThreads(arguments.listenerThreads)
- .enableBusyWait(arguments.enableBusyWait)
- .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
- if (isNotBlank(arguments.authPluginClassName)) {
- clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
- }
-
- if (arguments.tlsAllowInsecureConnection != null) {
- clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
- }
+ ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+ .enableTransaction(arguments.isEnableTransaction);
- if (isNotBlank(arguments.listenerName)) {
- clientBuilder.listenerName(arguments.listenerName);
- }
PulsarClient pulsarClient = clientBuilder.build();
AtomicReference<Transaction> atomicReference;
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 46155bd5eb4..446382f4dff 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -32,7 +32,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.util.concurrent.DefaultThreadFactory;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
@@ -68,7 +67,6 @@ import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -104,13 +102,7 @@ public class PerformanceProducer {
private static IMessageFormatter messageFormatter = null;
@Parameters(commandDescription = "Test pulsar producer performance.")
- static class Arguments {
-
- @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
- boolean help;
-
- @Parameter(names = { "-cf", "--conf-file" }, description = "Configuration file")
- public String confFile;
+ static class Arguments extends PerformanceBaseArguments {
@Parameter(description = "persistent://prop/ns/my-topic", required = true)
public List<String> topics;
@@ -143,33 +135,17 @@ public class PerformanceProducer {
@Parameter(names = { "-pn", "--producer-name" }, description = "Producer Name")
public String producerName = null;
- @Parameter(names = { "-u", "--service-url" }, description = "Pulsar Service URL")
- public String serviceURL;
-
@Parameter(names = { "-au", "--admin-url" }, description = "Pulsar Admin URL")
public String adminURL;
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name", hidden = true)
public String deprecatedAuthPluginClassName;
- @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
- public String authPluginClassName;
-
- @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
- String listenerName = null;
-
@Parameter(names = { "-ch",
"--chunking" }, description = "Should split the message and publish in chunks if message size is "
+ "larger than allowed max size")
private boolean chunkingAllowed = false;
- @Parameter(
- names = { "--auth-params" },
- description = "Authentication parameters, whose format is determined by the implementation "
- + "of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" "
- + "or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
- public String authParams;
-
@Parameter(names = { "-o", "--max-outstanding" }, description = "Max number of outstanding messages")
public int maxOutstanding = DEFAULT_MAX_PENDING_MESSAGES;
@@ -181,20 +157,11 @@ public class PerformanceProducer {
+ "of partitions, set 0 to not try to create the topic")
public Integer partitions = null;
- @Parameter(names = { "-c",
- "--max-connections" }, description = "Max number of TCP connections to a single broker")
- public int maxConnections = 1;
-
@Parameter(names = { "-m",
"--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep "
+ "publishing")
public long numMessages = 0;
- @Parameter(names = { "-i",
- "--stats-interval-seconds" }, description = "Statistics Interval Seconds. If 0, statistics will be "
- + "disabled")
- public long statsIntervalSeconds = 0;
-
@Parameter(names = { "-z", "--compression" }, description = "Compress messages payload")
public CompressionType compression = CompressionType.NONE;
@@ -228,14 +195,6 @@ public class PerformanceProducer {
@Parameter(names = "--warmup-time", description = "Warm-up time in seconds (Default: 1 sec)")
public double warmupTimeSeconds = 1.0;
- @Parameter(names = {
- "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
- public String tlsTrustCertsFilePath = "";
-
- @Parameter(names = {
- "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
- public Boolean tlsAllowInsecureConnection = null;
-
@Parameter(names = { "-k", "--encryption-key-name" }, description = "The public key name to encrypt payload")
public String encKeyName = null;
@@ -256,13 +215,6 @@ public class PerformanceProducer {
+ ", valid options are: [autoIncrement, random]")
public String messageKeyGenerationMode = null;
- @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be "
- + "used for handling connections to brokers. The default value is 1.")
- public int ioThreads = 1;
-
- @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
- public boolean enableBusyWait = false;
-
@Parameter(names = { "-am", "--access-mode" }, description = "Producer access mode")
public ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared;
@@ -292,6 +244,20 @@ public class PerformanceProducer {
@Parameter(names = { "--histogram-file" }, description = "HdrHistogram output file")
public String histogramFile = null;
+
+ @Override
+ public void fillArgumentsFromProperties(Properties prop) {
+ if (adminURL == null) {
+ adminURL = prop.getProperty("webServiceUrl");
+ }
+ if (adminURL == null) {
+ adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
+ }
+
+ if (isBlank(messageKeyGenerationMode)) {
+ messageKeyGenerationMode = prop.getProperty("messageKeyGenerationMode", null);
+ }
+ }
}
public static void main(String[] args) throws Exception {
@@ -333,49 +299,7 @@ public class PerformanceProducer {
}
}
- if (arguments.confFile != null) {
- Properties prop = new Properties(System.getProperties());
- prop.load(new FileInputStream(arguments.confFile));
-
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("brokerServiceUrl");
- }
-
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("webServiceUrl");
- }
-
- // fallback to previous-version serviceUrl property to maintain backward-compatibility
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
- }
-
- if (arguments.adminURL == null) {
- arguments.adminURL = prop.getProperty("webServiceUrl");
- }
- if (arguments.adminURL == null) {
- arguments.adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
- }
-
- if (arguments.authPluginClassName == null) {
- arguments.authPluginClassName = prop.getProperty("authPlugin", null);
- }
-
- if (arguments.authParams == null) {
- arguments.authParams = prop.getProperty("authParams", null);
- }
-
- if (isBlank(arguments.tlsTrustCertsFilePath)) {
- arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
- }
- if (isBlank(arguments.messageKeyGenerationMode)) {
- arguments.messageKeyGenerationMode = prop.getProperty("messageKeyGenerationMode", null);
- }
- if (arguments.tlsAllowInsecureConnection == null) {
- arguments.tlsAllowInsecureConnection = Boolean.parseBoolean(prop
- .getProperty("tlsAllowInsecureConnection", ""));
- }
- }
+ arguments.fillArgumentsFromProperties();
// Dump config variables
PerfClientUtils.printJVMInformation(log);
@@ -419,28 +343,19 @@ public class PerformanceProducer {
}));
if (arguments.partitions != null) {
- PulsarAdminBuilder clientBuilder = PulsarAdmin.builder()
- .serviceHttpUrl(arguments.adminURL)
- .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+ final PulsarAdminBuilder adminBuilder = PerfClientUtils
+ .createAdminBuilderFromArguments(arguments, arguments.adminURL);
- if (isNotBlank(arguments.authPluginClassName)) {
- clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
- }
-
- if (arguments.tlsAllowInsecureConnection != null) {
- clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
- }
-
- try (PulsarAdmin client = clientBuilder.build()) {
+ try (PulsarAdmin adminClient = adminBuilder.build()) {
for (String topic : arguments.topics) {
log.info("Creating partitioned topic {} with {} partitions", topic, arguments.partitions);
try {
- client.topics().createPartitionedTopic(topic, arguments.partitions);
+ adminClient.topics().createPartitionedTopic(topic, arguments.partitions);
} catch (PulsarAdminException.ConflictException alreadyExists) {
if (log.isDebugEnabled()) {
log.debug("Topic {} already exists: {}", topic, alreadyExists);
}
- PartitionedTopicMetadata partitionedTopicMetadata = client.topics()
+ PartitionedTopicMetadata partitionedTopicMetadata = adminClient.topics()
.getPartitionedTopicMetadata(topic);
if (partitionedTopicMetadata.partitions != arguments.partitions) {
log.error("Topic {} already exists but it has a wrong number of partitions: {}, "
@@ -572,27 +487,9 @@ public class PerformanceProducer {
// Now processing command line arguments
List<Future<Producer<byte[]>>> futures = new ArrayList<>();
- ClientBuilder clientBuilder = PulsarClient.builder() //
- .memoryLimit(0, SizeUnit.BYTES)
- .enableTransaction(arguments.isEnableTransaction)//
- .serviceUrl(arguments.serviceURL) //
- .connectionsPerBroker(arguments.maxConnections) //
- .ioThreads(arguments.ioThreads) //
- .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
- .enableBusyWait(arguments.enableBusyWait)
- .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
-
- if (isNotBlank(arguments.authPluginClassName)) {
- clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
- }
- if (arguments.tlsAllowInsecureConnection != null) {
- clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
- }
-
- if (isNotBlank(arguments.listenerName)) {
- clientBuilder.listenerName(arguments.listenerName);
- }
+ ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+ .enableTransaction(arguments.isEnableTransaction);
client = clientBuilder.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 1c99dce8c32..5245f634d1b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.testclient;
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
@@ -27,7 +25,6 @@ import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;
-import java.io.FileInputStream;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
@@ -43,7 +40,6 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderListener;
-import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -64,13 +60,8 @@ public class PerformanceReader {
private static Recorder cumulativeRecorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
@Parameters(commandDescription = "Test pulsar reader performance.")
- static class Arguments {
+ static class Arguments extends PerformanceBaseArguments {
- @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
- boolean help;
-
- @Parameter(names = { "-cf", "--conf-file" }, description = "Configuration file")
- public String confFile;
@Parameter(description = "persistent://prop/ns/my-topic", required = true)
public List<String> topic;
@@ -95,54 +86,20 @@ public class PerformanceReader {
+ "it will keep consuming")
public long numMessages = 0;
- @Parameter(names = { "-c",
- "--max-connections" }, description = "Max number of TCP connections to a single broker")
- public int maxConnections = 1;
-
- @Parameter(names = { "-i",
- "--stats-interval-seconds" },
- description = "Statistics Interval Seconds. If 0, statistics will be disabled")
- public long statsIntervalSeconds = 0;
-
- @Parameter(names = { "-u", "--service-url" }, description = "Pulsar Service URL")
- public String serviceURL;
-
- @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
- public String authPluginClassName;
-
- @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
- String listenerName = null;
-
- @Parameter(
- names = { "--auth-params" },
- description = "Authentication parameters, whose format is determined by the implementation "
- + "of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" "
- + "or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
- public String authParams;
-
@Parameter(names = {
"--use-tls" }, description = "Use TLS encryption on the connection")
public boolean useTls;
- @Parameter(names = {
- "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
- public String tlsTrustCertsFilePath = "";
-
- @Parameter(names = {
- "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
- public Boolean tlsAllowInsecureConnection = null;
-
@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming")
public long testTime = 0;
- @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be "
- + "used for handling connections to brokers, default is 1 thread")
- public int ioThreads = 1;
-
- @Parameter(names = {"-lt", "--num-listener-threads"}, description = "Set the number of threads"
- + " to be used for message listeners")
- public int listenerThreads = 1;
+ @Override
+ public void fillArgumentsFromProperties(Properties prop) {
+ if (!useTls) {
+ useTls = Boolean.parseBoolean(prop.getProperty("useTls"));
+ }
+ }
}
public static void main(String[] args) throws Exception {
@@ -178,45 +135,7 @@ public class PerformanceReader {
PerfClientUtils.exit(-1);
}
}
-
- if (arguments.confFile != null) {
- Properties prop = new Properties(System.getProperties());
- prop.load(new FileInputStream(arguments.confFile));
-
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("brokerServiceUrl");
- }
-
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("webServiceUrl");
- }
-
- // fallback to previous-version serviceUrl property to maintain backward-compatibility
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
- }
-
- if (arguments.authPluginClassName == null) {
- arguments.authPluginClassName = prop.getProperty("authPlugin", null);
- }
-
- if (arguments.authParams == null) {
- arguments.authParams = prop.getProperty("authParams", null);
- }
-
- if (!arguments.useTls) {
- arguments.useTls = Boolean.parseBoolean(prop.getProperty("useTls"));
- }
-
- if (isBlank(arguments.tlsTrustCertsFilePath)) {
- arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
- }
-
- if (arguments.tlsAllowInsecureConnection == null) {
- arguments.tlsAllowInsecureConnection = Boolean.parseBoolean(prop
- .getProperty("tlsAllowInsecureConnection", ""));
- }
- }
+ arguments.fillArgumentsFromProperties();
// Dump config variables
PerfClientUtils.printJVMInformation(log);
@@ -257,27 +176,8 @@ public class PerformanceReader {
}
};
- ClientBuilder clientBuilder = PulsarClient.builder() //
- .memoryLimit(0, SizeUnit.BYTES)
- .serviceUrl(arguments.serviceURL) //
- .connectionsPerBroker(arguments.maxConnections) //
- .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
- .ioThreads(arguments.ioThreads) //
- .listenerThreads(arguments.listenerThreads)
- .enableTls(arguments.useTls) //
- .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
-
- if (isNotBlank(arguments.authPluginClassName)) {
- clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
- }
-
- if (arguments.tlsAllowInsecureConnection != null) {
- clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
- }
-
- if (isNotBlank(arguments.listenerName)) {
- clientBuilder.listenerName(arguments.listenerName);
- }
+ ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+ .enableTls(arguments.useTls);
PulsarClient pulsarClient = clientBuilder.build();
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 1258918b18a..1be3644021b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -19,14 +19,12 @@
package org.apache.pulsar.testclient;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
@@ -62,7 +60,6 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
@@ -97,13 +94,7 @@ public class PerformanceTransaction {
@Parameters(commandDescription = "Test pulsar transaction performance.")
- static class Arguments {
-
- @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
- boolean help;
-
- @Parameter(names = { "-cf", "--conf-file" }, description = "Configuration file")
- public String confFile;
+ static class Arguments extends PerformanceBaseArguments {
@Parameter(names = "--topics-c", description = "All topics that need ack for a transaction", required =
true)
@@ -120,39 +111,18 @@ public class PerformanceTransaction {
+ "thereby increasing the intensity of the stress test.")
public int numTestThreads = 1;
- @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
- public String authPluginClassName;
-
- @Parameter(
- names = { "--auth-params" },
- description = "Authentication parameters, whose format is determined by the implementation "
- + "of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" "
- + "or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
- public String authParams;
-
@Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
public String adminURL;
- @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
- public String serviceURL;
-
@Parameter(names = {"-np",
"--partitions"}, description = "Create partitioned topics with a given number of partitions, 0 means"
+ "not trying to create a topic")
public Integer partitions = null;
- @Parameter(names = {"-c",
- "--max-connections"}, description = "Max number of TCP connections to a single broker")
- public int maxConnections = 1;
-
@Parameter(names = {"-time",
"--test-duration"}, description = "Test duration (in second). 0 means keeping publishing")
public long testTime = 0;
- @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be "
- + "used for handling connections to brokers. The default value is 1.")
- public int ioThreads = 1;
-
@Parameter(names = {"-ss",
"--subscriptions"}, description = "A list of subscriptions to consume (for example, sub1,sub2)")
public List<String> subscriptions = Collections.singletonList("sub");
@@ -198,6 +168,16 @@ public class PerformanceTransaction {
@Parameter(names = "-txnRate", description = "Set the rate of opened transaction or task. 0 means no limit")
public int openTxnRate = 0;
+
+ @Override
+ public void fillArgumentsFromProperties(Properties prop) {
+ if (adminURL == null) {
+ adminURL = prop.getProperty("webServiceUrl");
+ }
+ if (adminURL == null) {
+ adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
+ }
+ }
}
public static void main(String[] args)
@@ -218,40 +198,7 @@ public class PerformanceTransaction {
jc.usage();
PerfClientUtils.exit(-1);
}
-
- if (arguments.confFile != null) {
- Properties prop = new Properties(System.getProperties());
- prop.load(new FileInputStream(arguments.confFile));
-
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("brokerServiceUrl");
- }
-
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("webServiceUrl");
- }
-
- // fallback to previous-version serviceUrl property to maintain backward-compatibility
- if (arguments.serviceURL == null) {
- arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
- }
-
- if (arguments.adminURL == null) {
- arguments.adminURL = prop.getProperty("webServiceUrl");
- }
- if (arguments.adminURL == null) {
- arguments.adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
- }
-
- if (arguments.authPluginClassName == null) {
- arguments.authPluginClassName = prop.getProperty("authPlugin", null);
- }
-
- if (arguments.authParams == null) {
- arguments.authParams = prop.getProperty("authParams", null);
- }
- }
-
+ arguments.fillArgumentsFromProperties();
// Dump config variables
PerfClientUtils.printJVMInformation(log);
@@ -266,24 +213,20 @@ public class PerformanceTransaction {
payloadBytes[i] = (byte) (random.nextInt(26) + 65);
}
if (arguments.partitions != null) {
- PulsarAdminBuilder clientBuilder = PulsarAdmin.builder()
- .serviceHttpUrl(arguments.adminURL);
+ final PulsarAdminBuilder adminBuilder = PerfClientUtils
+ .createAdminBuilderFromArguments(arguments, arguments.adminURL);
- if (isNotBlank(arguments.authPluginClassName)) {
- clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
- }
-
- try (PulsarAdmin client = clientBuilder.build()) {
+ try (PulsarAdmin adminClient = adminBuilder.build()) {
for (String topic : arguments.producerTopic) {
log.info("Creating produce partitioned topic {} with {} partitions", topic, arguments.partitions);
try {
- client.topics().createPartitionedTopic(topic, arguments.partitions);
+ adminClient.topics().createPartitionedTopic(topic, arguments.partitions);
} catch (PulsarAdminException.ConflictException alreadyExists) {
if (log.isDebugEnabled()) {
log.debug("Topic {} already exists: {}", topic, alreadyExists);
}
PartitionedTopicMetadata partitionedTopicMetadata =
- client.topics().getPartitionedTopicMetadata(topic);
+ adminClient.topics().getPartitionedTopicMetadata(topic);
if (partitionedTopicMetadata.partitions != arguments.partitions) {
log.error(
"Topic {} already exists but it has a wrong number of partitions: {}, expecting {}",
@@ -295,18 +238,8 @@ public class PerformanceTransaction {
}
}
- ClientBuilder clientBuilder =
- PulsarClient.builder()
- .memoryLimit(0, SizeUnit.BYTES)
- .enableTransaction(!arguments.isDisableTransaction)
- .serviceUrl(arguments.serviceURL)
- .connectionsPerBroker(arguments.maxConnections)
- .statsInterval(0, TimeUnit.SECONDS)
- .ioThreads(arguments.ioThreads);
-
- if (isNotBlank(arguments.authPluginClassName)) {
- clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
- }
+ ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+ .enableTransaction(!arguments.isDisableTransaction);
PulsarClient client = clientBuilder.build();
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerfClientUtilsTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerfClientUtilsTest.java
new file mode 100644
index 00000000000..78dbc1fff88
--- /dev/null
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerfClientUtilsTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class PerfClientUtilsTest {
+
+ public static class MyAuth implements Authentication {
+ @Override
+ public String getAuthMethodName() {
+ return null;
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ @Test
+ public void testClientCreation() throws Exception {
+
+ final PerformanceBaseArguments args = new PerformanceBaseArguments() {
+ @Override
+ public void fillArgumentsFromProperties(Properties prop) {
+ }
+ };
+
+ args.tlsHostnameVerificationEnable = true;
+ args.authPluginClassName = MyAuth.class.getName();
+ args.authParams = "params";
+ args.enableBusyWait = true;
+ args.maxConnections = 100;
+ args.ioThreads = 16;
+ args.listenerName = "listener";
+ args.listenerThreads = 12;
+ args.statsIntervalSeconds = Long.MAX_VALUE;
+ args.serviceURL = "pulsar+ssl://my-pulsar:6651";
+ args.tlsTrustCertsFilePath = "path";
+ args.tlsAllowInsecureConnection = true;
+
+ final ClientBuilderImpl builder = (ClientBuilderImpl)PerfClientUtils.createClientBuilderFromArguments(args);
+ final ClientConfigurationData conf = builder.getClientConfigurationData();
+
+ Assert.assertTrue(conf.isTlsHostnameVerificationEnable());
+ Assert.assertEquals(conf.getAuthPluginClassName(), MyAuth.class.getName());
+ Assert.assertEquals(conf.getAuthParams(), "params");
+ Assert.assertTrue(conf.isEnableBusyWait());
+ Assert.assertEquals(conf.getConnectionsPerBroker(), 100);
+ Assert.assertEquals(conf.getNumIoThreads(), 16);
+ Assert.assertEquals(conf.getListenerName(), "listener");
+ Assert.assertEquals(conf.getNumListenerThreads(), 12);
+ Assert.assertEquals(conf.getStatsIntervalSeconds(), Long.MAX_VALUE);
+ Assert.assertEquals(conf.getServiceUrl(), "pulsar+ssl://my-pulsar:6651");
+ Assert.assertEquals(conf.getTlsTrustCertsFilePath(), "path");
+ Assert.assertTrue(conf.isTlsAllowInsecureConnection());
+
+ }
+}
\ No newline at end of file
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java
new file mode 100644
index 00000000000..0b244a5a4e1
--- /dev/null
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class PerformanceBaseArgumentsTest {
+
+ @Test
+ public void testReadFromConfigFile() {
+
+ AtomicBoolean called = new AtomicBoolean();
+
+ final PerformanceBaseArguments args = new PerformanceBaseArguments() {
+ @Override
+ public void fillArgumentsFromProperties(Properties prop) {
+ called.set(true);
+ }
+ };
+ args.confFile = "./src/test/resources/perf_client1.conf";
+ args.fillArgumentsFromProperties();
+ Assert.assertTrue(called.get());
+ Assert.assertEquals(args.serviceURL, "https://my-pulsar:8443/");
+ Assert.assertEquals(args.authPluginClassName,
+ "org.apache.pulsar.testclient.PerfClientUtilsTest.MyAuth");
+ Assert.assertEquals(args.authParams, "myparams");
+ Assert.assertEquals(args.tlsTrustCertsFilePath, "./path");
+ Assert.assertTrue(args.tlsAllowInsecureConnection);
+ Assert.assertTrue(args.tlsHostnameVerificationEnable);
+ }
+
+}
\ No newline at end of file
diff --git a/pulsar-testclient/src/test/resources/perf_client1.conf b/pulsar-testclient/src/test/resources/perf_client1.conf
new file mode 100644
index 00000000000..127960618bf
--- /dev/null
+++ b/pulsar-testclient/src/test/resources/perf_client1.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+brokerServiceUrl=https://my-pulsar:8443/
+authPlugin=org.apache.pulsar.testclient.PerfClientUtilsTest.MyAuth
+authParams=myparams
+tlsTrustCertsFilePath=./path
+tlsAllowInsecureConnection=true
+tlsEnableHostnameVerification=true
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index 0e533e10662..19a9f808e23 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -537,6 +537,24 @@ The table below lists the environment variables that you can use to configure th
|`PULSAR_EXTRA_CLASSPATH`|Extra paths for Pulsar's classpath||
|`PULSAR_GC_LOG`|Gc options to be passed to the jvm||
+Commands `consume`, `produce`, `read` and `transaction` share the following client options:
+
+|Flag|Description|Default|
+|---|---|---|
+|`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class. For example, `key1:val1,key2:val2` or `{"key1":"val1","key2":"val2"}`.||
+|`--auth-plugin`|Authentication plugin class name||
+|`-bw`, `--busy-wait`|Enable or disable Busy-Wait on the Pulsar client|false|
+|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
+|`-cf`, `--conf-file`|Configuration file||
+|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
+|`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
+|`--listener-name`|Listener name for the broker||
+|`-lt`, `--num-listener-threads`|Set the number of threads to be used for message listeners|1|
+|`--tls-allow-insecure`|Allow insecure TLS connection||
+|`--tls-hostname-verification`|Enable TLS hostname verification||
+|`--trust-cert-file`|Path for the trusted TLS certificate file||
+|`-u`, `--service-url`|Pulsar service URL||
+
### `consume`
Run a consumer
@@ -553,23 +571,14 @@ Options
|Flag|Description|Default|
|---|---|---|
-|`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class. For example, `key1:val1,key2:val2` or `{"key1":"val1","key2":"val2"}`.||
-|`--auth-plugin`|Authentication plugin class name||
|`-ac`, `--auto_ack_chunk_q_full`|Auto ack for the oldest message in consumer's receiver queue if the queue full|false|
-|`--listener-name`|Listener name for the broker||
|`--acks-delay-millis`|Acknowledgements grouping delay in millis|100|
|`--batch-index-ack`|Enable or disable the batch index acknowledgment|false|
-|`-bw`, `--busy-wait`|Enable or disable Busy-Wait on the Pulsar client|false|
|`-v`, `--encryption-key-value-file`|The file which contains the private key to decrypt payload||
-|`-h`, `--help`|Help message|false|
-|`-cf`, `--conf-file`|Configuration file||
|`-m`, `--num-messages`|Number of messages to consume in total. If the value is equal to or smaller than 0, it keeps consuming messages.|0|
|`-e`, `--expire_time_incomplete_chunked_messages`|The expiration time for incomplete chunk messages (in milliseconds)|0|
-|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-mc`, `--max_chunked_msg`|Max pending chunk messages|0|
|`-n`, `--num-consumers`|Number of consumers (per topic)|1|
-|`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
-|`-lt`, `--num-listener-threads`|Set the number of threads to be used for message listeners|1|
|`-ns`, `--num-subscriptions`|Number of subscriptions (per topic)|1|
|`-t`, `--num-topics`|The number of topics|1|
|`-pm`, `--pool-messages`|Use the pooled message|true|
@@ -577,15 +586,12 @@ Options
|`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
|`-p`, `--receiver-queue-size-across-partitions`|Max total size of the receiver queue across partitions|50000|
|`--replicated`|Whether the subscription status should be replicated|false|
-|`-u`, `--service-url`|Pulsar service URL||
-|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
|`-s`, `--subscriber-name`|Subscriber name prefix||
|`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub|
|`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
|`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|
|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0|
-|`--trust-cert-file`|Path for the trusted TLS certificate file||
-|`--tls-allow-insecure`|Allow insecure TLS connection||
+
Below are **transaction** related options.
@@ -616,29 +622,22 @@ Options
|---|---|---|
|`-am`, `--access-mode`|Producer access mode. Valid values are `Shared`, `Exclusive` and `WaitForExclusive`|Shared|
|`-au`, `--admin-url`|Pulsar admin URL||
-|`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class. For example, `key1:val1,key2:val2` or `{"key1":"val1","key2":"val2"}`.||
-|`--auth-plugin`|Authentication plugin class name||
-|`--listener-name`|Listener name for the broker||
|`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
|`-bb`, `--batch-max-bytes`|Maximum number of bytes per batch|4194304|
|`-bm`, `--batch-max-messages`|Maximum number of messages per batch|1000|
-|`-bw`, `--busy-wait`|Enable or disable Busy-Wait on the Pulsar client|false|
|`-ch`, `--chunking`|Split the message and publish in chunks if the message size is larger than allowed max size|false|
|`-d`, `--delay`|Mark messages with a given delay in seconds|0s|
|`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB, ZSTD or SNAPPY.||
-|`-cf`, `--conf-file`|Configuration file||
|`-k`, `--encryption-key-name`|The public key name to encrypt payload||
|`-v`, `--encryption-key-value-file`|The file which contains the public key to encrypt payload||
|`-ef`, `--exit-on-failure`|Exit from the process on publish failure|false|
|`-fc`, `--format-class`|Custom Formatter class name|org.apache.pulsar.testclient.DefaultMessageFormatter|
|`-fp`, `--format-payload`|Format %i as a message index in the stream from producer and/or %t as the timestamp nanoseconds|false|
|`-h`, `--help`|Help message|false|
-|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-o`, `--max-outstanding`|Max number of outstanding messages|1000|
|`-p`, `--max-outstanding-across-partitions`|Max number of outstanding messages across partitions|50000|
|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
|`-mk`, `--message-key-generation-mode`|The generation mode of message key. Valid options are `autoIncrement`, `random`||
-|`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
|`-n`, `--num-producers`|The number of producers (per topic)|1|
|`-threads`, `--num-test-threads`|Number of test threads|1|
|`-t`, `--num-topic`|The number of topics|1|
@@ -649,13 +648,9 @@ Options
|`-r`, `--rate`|Publish rate msg/s across topics|100|
|`--send-timeout`|Set the sendTimeout|0|
|`--separator`|Separator between the topic and topic number|-|
-|`-u`, `--service-url`|Pulsar service URL||
|`-s`, `--size`|Message size (in bytes)|1024|
-|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|
-|`--trust-cert-file`|Path for the trusted TLS certificate file||
|`--warmup-time`|Warm-up time in seconds|1|
-|`--tls-allow-insecure`|Allow insecure TLS connection||
Below are **transaction** related options.
@@ -683,25 +678,14 @@ Options
|Flag|Description|Default|
|---|---|---|
-|`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class. For example, `key1:val1,key2:val2` or `{"key1":"val1","key2":"val2"}`.||
-|`--auth-plugin`|Authentication plugin class name||
-|`--listener-name`|Listener name for the broker||
-|`-cf`, `--conf-file`|Configuration file||
|`-h`, `--help`|Help message|false|
|`-n`, `--num-messages`|Number of messages to consume in total. If the value is equal to or smaller than 0, it keeps consuming messages.|0|
-|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
-|`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
-|`-lt`, `--num-listener-threads`|Set the number of threads to be used for message listeners|1|
|`-t`, `--num-topics`|The number of topics|1|
|`-r`, `--rate`|Simulate a slow message reader (rate in msg/s)|0|
|`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
-|`-u`, `--service-url`|Pulsar service URL||
|`-m`, `--start-message-id`|Start message id. This can be either 'earliest', 'latest' or a specific message id by using 'lid:eid'|earliest|
-|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0|
-|`--trust-cert-file`|Path for the trusted TLS certificate file||
|`--use-tls`|Use TLS encryption on the connection|false|
-|`--tls-allow-insecure`|Allow insecure TLS connection||
### `websocket-producer`
Run a websocket producer
@@ -839,13 +823,8 @@ $ pulsar-perf transaction options
|Flag|Description|Default|
|---|---|---|
-`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class. For example, `key1:val1,key2:val2` or `{"key1":"val1","key2":"val2"}`.|N/A
-`--auth-plugin`|Authentication plugin class name.|N/A
`-au`, `--admin-url`|Pulsar admin URL.|N/A
-`-cf`, `--conf-file`|Configuration file.|N/A
`-h`, `--help`|Help messages.|N/A
-`-c`, `--max-connections`|Maximum number of TCP connections to a single broker.|100
-`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers. |1
`-ns`, `--num-subscriptions`|Number of subscriptions per topic.|1
`-threads`, `--num-test-threads`|Number of test threads. <br /><br />This thread is for a new transaction to ack messages from consumer topics, produce messages to producer topics, and commit or abort this transaction. <br /><br /> Increasing the number of threads increases the parallelism of the performance test, consequently, it increases the intensity of the stress test.|1
`-nmc`, `--numMessage-perTransaction-consume`|Set the number of messages consumed in a transaction. <br /><br /> If transaction is disabled, it means the number of messages consumed in a task instead of in a transaction.|1
@@ -853,7 +832,6 @@ $ pulsar-perf transaction options
`-ntxn`, `--number-txn`|Set the number of transactions. <br /><br /> 0 means the number of transactions is unlimited. <br /><br /> If transaction is disabled, it means the number of tasks instead of transactions. |0
`-np`, `--partitions`|Create partitioned topics with a given number of partitions. <br /><br /> 0 means not trying to create a topic.
`-q`, `--receiver-queue-size`|Size of the receiver queue.|1000
-`-u`, `--service-url`|Pulsar service URL.|N/A
`-sp`, `--subscription-position`|Subscription position.|Earliest
`-st`, `--subscription-type`|Subscription type.|Shared
`-ss`, `--subscriptions`|A list of subscriptions to consume. <br /><br /> For example, sub1,sub2.|[sub]