You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/17 14:11:24 UTC

[pulsar] branch master updated: [improve][pulsar-perf] Transactions: improve client options (#16090)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c7847c03bc [improve][pulsar-perf] Transactions: improve client options (#16090)
6c7847c03bc is described below

commit 6c7847c03bcedf85106e14314a8ae9e6e16ac772
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Jun 17 16:11:10 2022 +0200

    [improve][pulsar-perf] Transactions: improve client options (#16090)
---
 .../apache/pulsar/testclient/PerfClientUtils.java  |  62 +++++++++
 .../testclient/PerformanceBaseArguments.java       | 135 +++++++++++++++++++
 .../pulsar/testclient/PerformanceConsumer.java     | 116 ++--------------
 .../pulsar/testclient/PerformanceProducer.java     | 149 ++++-----------------
 .../pulsar/testclient/PerformanceReader.java       | 120 ++---------------
 .../pulsar/testclient/PerformanceTransaction.java  | 105 +++------------
 .../pulsar/testclient/PerfClientUtilsTest.java     |  91 +++++++++++++
 .../testclient/PerformanceBaseArgumentsTest.java   |  52 +++++++
 .../src/test/resources/perf_client1.conf           |  25 ++++
 site2/docs/reference-cli-tools.md                  |  60 +++------
 10 files changed, 444 insertions(+), 471 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index 44dcad90e3e..2b0c989bfbb 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -18,11 +18,19 @@
  */
 package org.apache.pulsar.testclient;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import java.lang.management.ManagementFactory;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import lombok.experimental.UtilityClass;
 import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.slf4j.Logger;
 
@@ -55,5 +63,59 @@ public class PerfClientUtils {
                 FileUtils.byteCountToDisplaySize(Runtime.getRuntime().maxMemory()));
     }
 
+    public static ClientBuilder createClientBuilderFromArguments(PerformanceBaseArguments arguments)
+            throws PulsarClientException.UnsupportedAuthenticationException {
+
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .memoryLimit(0, SizeUnit.BYTES)
+                .serviceUrl(arguments.serviceURL)
+                .connectionsPerBroker(arguments.maxConnections)
+                .ioThreads(arguments.ioThreads)
+                .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS)
+                .enableBusyWait(arguments.enableBusyWait)
+                .listenerThreads(arguments.listenerThreads)
+                .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
+        if (isNotBlank(arguments.authPluginClassName)) {
+            clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
+        }
+
+        if (arguments.tlsAllowInsecureConnection != null) {
+            clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
+        }
+
+        if (arguments.tlsHostnameVerificationEnable != null) {
+            clientBuilder.enableTlsHostnameVerification(arguments.tlsHostnameVerificationEnable);
+        }
+
+        if (isNotBlank(arguments.listenerName)) {
+            clientBuilder.listenerName(arguments.listenerName);
+        }
+        return clientBuilder;
+    }
+
+    public static PulsarAdminBuilder createAdminBuilderFromArguments(PerformanceBaseArguments arguments,
+                                                                     final String adminUrl)
+            throws PulsarClientException.UnsupportedAuthenticationException {
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder()
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
+        if (isNotBlank(arguments.authPluginClassName)) {
+            pulsarAdminBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
+        }
+
+        if (arguments.tlsAllowInsecureConnection != null) {
+            pulsarAdminBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
+        }
+
+        if (arguments.tlsHostnameVerificationEnable != null) {
+            pulsarAdminBuilder.enableTlsHostnameVerification(arguments.tlsHostnameVerificationEnable);
+        }
+
+        return pulsarAdminBuilder;
+    }
+
 
 }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
new file mode 100644
index 00000000000..9c66d71ca6e
--- /dev/null
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import com.beust.jcommander.Parameter;
+import java.io.FileInputStream;
+import java.util.Properties;
+import lombok.SneakyThrows;
+
+
+public abstract class PerformanceBaseArguments {
+
+    @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
+    boolean help;
+
+    @Parameter(names = { "-cf", "--conf-file" }, description = "Configuration file")
+    public String confFile;
+
+    @Parameter(names = { "-u", "--service-url" }, description = "Pulsar Service URL")
+    public String serviceURL;
+
+    @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
+    public String authPluginClassName;
+
+    @Parameter(
+            names = { "--auth-params" },
+            description = "Authentication parameters, whose format is determined by the implementation "
+                    + "of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" "
+                    + "or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
+    public String authParams;
+
+    @Parameter(names = {
+            "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
+    public String tlsTrustCertsFilePath = "";
+
+    @Parameter(names = {
+            "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
+    public Boolean tlsAllowInsecureConnection = null;
+
+    @Parameter(names = {
+            "--tls-hostname-verification" }, description = "Enable TLS hostname verification")
+    public Boolean tlsHostnameVerificationEnable = null;
+
+    @Parameter(names = { "-c",
+            "--max-connections" }, description = "Max number of TCP connections to a single broker")
+    public int maxConnections = 1;
+
+    @Parameter(names = { "-i",
+            "--stats-interval-seconds" },
+            description = "Statistics Interval Seconds. If 0, statistics will be disabled")
+    public long statsIntervalSeconds = 0;
+
+    @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be "
+            + "used for handling connections to brokers. The default value is 1.")
+    public int ioThreads = 1;
+
+    @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
+    public boolean enableBusyWait = false;
+
+    @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
+    public String listenerName = null;
+
+    @Parameter(names = {"-lt", "--num-listener-threads"}, description = "Set the number of threads"
+            + " to be used for message listeners")
+    public int listenerThreads = 1;
+
+    public abstract void fillArgumentsFromProperties(Properties prop);
+
+    @SneakyThrows
+    public void fillArgumentsFromProperties() {
+        if (confFile == null) {
+            return;
+        }
+
+        Properties prop = new Properties(System.getProperties());
+        try (FileInputStream fis = new FileInputStream(confFile)) {
+            prop.load(fis);
+        }
+
+        if (serviceURL == null) {
+            serviceURL = prop.getProperty("brokerServiceUrl");
+        }
+
+        if (serviceURL == null) {
+            serviceURL = prop.getProperty("webServiceUrl");
+        }
+
+        // fallback to previous-version serviceUrl property to maintain backward-compatibility
+        if (serviceURL == null) {
+            serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
+        }
+
+        if (authPluginClassName == null) {
+            authPluginClassName = prop.getProperty("authPlugin", null);
+        }
+
+        if (authParams == null) {
+            authParams = prop.getProperty("authParams", null);
+        }
+
+        if (isBlank(tlsTrustCertsFilePath)) {
+            tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
+        }
+
+        if (tlsAllowInsecureConnection == null) {
+            tlsAllowInsecureConnection = Boolean.parseBoolean(prop
+                    .getProperty("tlsAllowInsecureConnection", ""));
+        }
+
+        if (tlsHostnameVerificationEnable == null) {
+            tlsHostnameVerificationEnable = Boolean.parseBoolean(prop
+                    .getProperty("tlsEnableHostnameVerification", ""));
+
+        }
+        fillArgumentsFromProperties(prop);
+    }
+
+}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 6b145ca288a..3ac281db6fb 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -27,7 +27,6 @@ import com.beust.jcommander.Parameters;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.util.concurrent.RateLimiter;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
@@ -51,7 +50,6 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -87,13 +85,7 @@ public class PerformanceConsumer {
     private static final Recorder cumulativeRecorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
 
     @Parameters(commandDescription = "Test pulsar consumer performance.")
-    static class Arguments {
-
-        @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
-        boolean help;
-
-        @Parameter(names = { "-cf", "--conf-file" }, description = "Configuration file")
-        public String confFile;
+    static class Arguments extends PerformanceBaseArguments {
 
         @Parameter(description = "persistent://prop/ns/my-topic", required = true)
         public List<String> topic;
@@ -149,27 +141,9 @@ public class PerformanceConsumer {
                 description = "Number of messages to consume in total. If <= 0, it will keep consuming")
         public long numMessages = 0;
 
-        @Parameter(names = { "-c",
-                "--max-connections" }, description = "Max number of TCP connections to a single broker")
-        public int maxConnections = 1;
-
-        @Parameter(names = { "-i",
-                "--stats-interval-seconds" },
-                description = "Statistics Interval Seconds. If 0, statistics will be disabled")
-        public long statsIntervalSeconds = 0;
-
-        @Parameter(names = { "-u", "--service-url" }, description = "Pulsar Service URL")
-        public String serviceURL;
-
         @Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name", hidden = true)
         public String deprecatedAuthPluginClassName;
 
-        @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
-        public String authPluginClassName;
-
-        @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
-        String listenerName = null;
-
         @Parameter(names = { "-mc", "--max_chunked_msg" }, description = "Max pending chunk messages")
         private int maxPendingChunkedMessage = 0;
 
@@ -182,21 +156,6 @@ public class PerformanceConsumer {
                 description = "Expire time in ms for incomplete chunk messages")
         private long expireTimeOfIncompleteChunkedMessageMs = 0;
 
-        @Parameter(
-            names = { "--auth-params" },
-            description = "Authentication parameters, whose format is determined by the implementation "
-                    + "of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" "
-                    + "or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
-        public String authParams;
-
-        @Parameter(names = {
-                "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
-        public String tlsTrustCertsFilePath = "";
-
-        @Parameter(names = {
-                "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
-        public Boolean tlsAllowInsecureConnection = null;
-
         @Parameter(names = { "-v",
                 "--encryption-key-value-file" },
                 description = "The file which contains the private key to decrypt payload")
@@ -206,23 +165,12 @@ public class PerformanceConsumer {
                 "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming")
         public long testTime = 0;
 
-        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be "
-                + "used for handling connections to brokers. The default value is 1.")
-        public int ioThreads = 1;
-
-        @Parameter(names = {"-lt", "--num-listener-threads"}, description = "Set the number of threads"
-                + " to be used for message listeners")
-        public int listenerThreads = 1;
-
         @Parameter(names = {"--batch-index-ack" }, description = "Enable or disable the batch index acknowledgment")
         public boolean batchIndexAck = false;
 
         @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message", arity = 1)
         private boolean poolMessages = true;
 
-        @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
-        public boolean enableBusyWait = false;
-
         @Parameter(names = {"-tto", "--txn-timeout"},  description = "Set the time value of transaction timeout,"
                 + " and the time unit is second. (After --txn-enable setting to true, --txn-timeout takes effect)")
         public long transactionTimeout = 10;
@@ -245,6 +193,10 @@ public class PerformanceConsumer {
 
         @Parameter(names = { "--histogram-file" }, description = "HdrHistogram output file")
         public String histogramFile = null;
+
+        @Override
+        public void fillArgumentsFromProperties(Properties prop) {
+        }
     }
 
     public static void main(String[] args) throws Exception {
@@ -308,41 +260,7 @@ public class PerformanceConsumer {
                 PerfClientUtils.exit(-1);
             }
         }
-
-        if (arguments.confFile != null) {
-            Properties prop = new Properties(System.getProperties());
-            prop.load(new FileInputStream(arguments.confFile));
-
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("brokerServiceUrl");
-            }
-
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("webServiceUrl");
-            }
-
-            // fallback to previous-version serviceUrl property to maintain backward-compatibility
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
-            }
-
-            if (arguments.authPluginClassName == null) {
-                arguments.authPluginClassName = prop.getProperty("authPlugin", null);
-            }
-
-            if (arguments.authParams == null) {
-                arguments.authParams = prop.getProperty("authParams", null);
-            }
-
-            if (isBlank(arguments.tlsTrustCertsFilePath)) {
-                arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
-            }
-
-            if (arguments.tlsAllowInsecureConnection == null) {
-                arguments.tlsAllowInsecureConnection = Boolean.parseBoolean(prop
-                        .getProperty("tlsAllowInsecureConnection", ""));
-            }
-        }
+        arguments.fillArgumentsFromProperties();
 
         // Dump config variables
         PerfClientUtils.printJVMInformation(log);
@@ -356,27 +274,9 @@ public class PerformanceConsumer {
         long startTime = System.nanoTime();
         long testEndTime = startTime + (long) (arguments.testTime * 1e9);
 
-        ClientBuilder clientBuilder = PulsarClient.builder() //
-                .memoryLimit(0, SizeUnit.BYTES)
-                .enableTransaction(arguments.isEnableTransaction)
-                .serviceUrl(arguments.serviceURL) //
-                .connectionsPerBroker(arguments.maxConnections) //
-                .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
-                .ioThreads(arguments.ioThreads) //
-                .listenerThreads(arguments.listenerThreads)
-                .enableBusyWait(arguments.enableBusyWait)
-                .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
-        if (isNotBlank(arguments.authPluginClassName)) {
-            clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
-        }
-
-        if (arguments.tlsAllowInsecureConnection != null) {
-            clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
-        }
+        ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+                .enableTransaction(arguments.isEnableTransaction);
 
-        if (isNotBlank(arguments.listenerName)) {
-            clientBuilder.listenerName(arguments.listenerName);
-        }
         PulsarClient pulsarClient = clientBuilder.build();
 
         AtomicReference<Transaction> atomicReference;
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 46155bd5eb4..446382f4dff 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -32,7 +32,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.util.concurrent.DefaultThreadFactory;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
@@ -68,7 +67,6 @@ import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -104,13 +102,7 @@ public class PerformanceProducer {
     private static IMessageFormatter messageFormatter = null;
 
     @Parameters(commandDescription = "Test pulsar producer performance.")
-    static class Arguments {
-
-        @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
-        boolean help;
-
-        @Parameter(names = { "-cf", "--conf-file" }, description = "Configuration file")
-        public String confFile;
+    static class Arguments extends PerformanceBaseArguments {
 
         @Parameter(description = "persistent://prop/ns/my-topic", required = true)
         public List<String> topics;
@@ -143,33 +135,17 @@ public class PerformanceProducer {
         @Parameter(names = { "-pn", "--producer-name" }, description = "Producer Name")
         public String producerName = null;
 
-        @Parameter(names = { "-u", "--service-url" }, description = "Pulsar Service URL")
-        public String serviceURL;
-
         @Parameter(names = { "-au", "--admin-url" }, description = "Pulsar Admin URL")
         public String adminURL;
 
         @Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name", hidden = true)
         public String deprecatedAuthPluginClassName;
 
-        @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
-        public String authPluginClassName;
-
-        @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
-        String listenerName = null;
-
         @Parameter(names = { "-ch",
                 "--chunking" }, description = "Should split the message and publish in chunks if message size is "
                 + "larger than allowed max size")
         private boolean chunkingAllowed = false;
 
-        @Parameter(
-            names = { "--auth-params" },
-            description = "Authentication parameters, whose format is determined by the implementation "
-                    + "of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" "
-                    + "or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
-        public String authParams;
-
         @Parameter(names = { "-o", "--max-outstanding" }, description = "Max number of outstanding messages")
         public int maxOutstanding = DEFAULT_MAX_PENDING_MESSAGES;
 
@@ -181,20 +157,11 @@ public class PerformanceProducer {
                 + "of partitions, set 0 to not try to create the topic")
         public Integer partitions = null;
 
-        @Parameter(names = { "-c",
-                "--max-connections" }, description = "Max number of TCP connections to a single broker")
-        public int maxConnections = 1;
-
         @Parameter(names = { "-m",
                 "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep "
                 + "publishing")
         public long numMessages = 0;
 
-        @Parameter(names = { "-i",
-                "--stats-interval-seconds" }, description = "Statistics Interval Seconds. If 0, statistics will be "
-                + "disabled")
-        public long statsIntervalSeconds = 0;
-
         @Parameter(names = { "-z", "--compression" }, description = "Compress messages payload")
         public CompressionType compression = CompressionType.NONE;
 
@@ -228,14 +195,6 @@ public class PerformanceProducer {
         @Parameter(names = "--warmup-time", description = "Warm-up time in seconds (Default: 1 sec)")
         public double warmupTimeSeconds = 1.0;
 
-        @Parameter(names = {
-                "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
-        public String tlsTrustCertsFilePath = "";
-
-        @Parameter(names = {
-                "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
-        public Boolean tlsAllowInsecureConnection = null;
-
         @Parameter(names = { "-k", "--encryption-key-name" }, description = "The public key name to encrypt payload")
         public String encKeyName = null;
 
@@ -256,13 +215,6 @@ public class PerformanceProducer {
                 + ", valid options are: [autoIncrement, random]")
         public String messageKeyGenerationMode = null;
 
-        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be "
-                + "used for handling connections to brokers. The default value is 1.")
-        public int ioThreads = 1;
-
-        @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
-        public boolean enableBusyWait = false;
-
         @Parameter(names = { "-am", "--access-mode" }, description = "Producer access mode")
         public ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared;
 
@@ -292,6 +244,20 @@ public class PerformanceProducer {
 
         @Parameter(names = { "--histogram-file" }, description = "HdrHistogram output file")
         public String histogramFile = null;
+
+        @Override
+        public void fillArgumentsFromProperties(Properties prop) {
+            if (adminURL == null) {
+                adminURL = prop.getProperty("webServiceUrl");
+            }
+            if (adminURL == null) {
+                adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
+            }
+
+            if (isBlank(messageKeyGenerationMode)) {
+                messageKeyGenerationMode = prop.getProperty("messageKeyGenerationMode", null);
+            }
+        }
     }
 
     public static void main(String[] args) throws Exception {
@@ -333,49 +299,7 @@ public class PerformanceProducer {
             }
         }
 
-        if (arguments.confFile != null) {
-            Properties prop = new Properties(System.getProperties());
-            prop.load(new FileInputStream(arguments.confFile));
-
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("brokerServiceUrl");
-            }
-
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("webServiceUrl");
-            }
-
-            // fallback to previous-version serviceUrl property to maintain backward-compatibility
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
-            }
-
-            if (arguments.adminURL == null) {
-                arguments.adminURL = prop.getProperty("webServiceUrl");
-            }
-            if (arguments.adminURL == null) {
-                arguments.adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
-            }
-
-            if (arguments.authPluginClassName == null) {
-                arguments.authPluginClassName = prop.getProperty("authPlugin", null);
-            }
-
-            if (arguments.authParams == null) {
-                arguments.authParams = prop.getProperty("authParams", null);
-            }
-
-            if (isBlank(arguments.tlsTrustCertsFilePath)) {
-               arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
-            }
-            if (isBlank(arguments.messageKeyGenerationMode)) {
-                arguments.messageKeyGenerationMode = prop.getProperty("messageKeyGenerationMode", null);
-            }
-            if (arguments.tlsAllowInsecureConnection == null) {
-                arguments.tlsAllowInsecureConnection = Boolean.parseBoolean(prop
-                        .getProperty("tlsAllowInsecureConnection", ""));
-            }
-        }
+        arguments.fillArgumentsFromProperties();
 
         // Dump config variables
         PerfClientUtils.printJVMInformation(log);
@@ -419,28 +343,19 @@ public class PerformanceProducer {
         }));
 
         if (arguments.partitions  != null) {
-            PulsarAdminBuilder clientBuilder = PulsarAdmin.builder()
-                    .serviceHttpUrl(arguments.adminURL)
-                    .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+            final PulsarAdminBuilder adminBuilder = PerfClientUtils
+                    .createAdminBuilderFromArguments(arguments, arguments.adminURL);
 
-            if (isNotBlank(arguments.authPluginClassName)) {
-                clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
-            }
-
-            if (arguments.tlsAllowInsecureConnection != null) {
-                clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
-            }
-
-            try (PulsarAdmin client = clientBuilder.build()) {
+            try (PulsarAdmin adminClient = adminBuilder.build()) {
                 for (String topic : arguments.topics) {
                     log.info("Creating partitioned topic {} with {} partitions", topic, arguments.partitions);
                     try {
-                        client.topics().createPartitionedTopic(topic, arguments.partitions);
+                        adminClient.topics().createPartitionedTopic(topic, arguments.partitions);
                     } catch (PulsarAdminException.ConflictException alreadyExists) {
                         if (log.isDebugEnabled()) {
                             log.debug("Topic {} already exists: {}", topic, alreadyExists);
                         }
-                        PartitionedTopicMetadata partitionedTopicMetadata = client.topics()
+                        PartitionedTopicMetadata partitionedTopicMetadata = adminClient.topics()
                                 .getPartitionedTopicMetadata(topic);
                         if (partitionedTopicMetadata.partitions != arguments.partitions) {
                             log.error("Topic {} already exists but it has a wrong number of partitions: {}, "
@@ -572,27 +487,9 @@ public class PerformanceProducer {
             // Now processing command line arguments
             List<Future<Producer<byte[]>>> futures = new ArrayList<>();
 
-            ClientBuilder clientBuilder = PulsarClient.builder() //
-                    .memoryLimit(0, SizeUnit.BYTES)
-                    .enableTransaction(arguments.isEnableTransaction)//
-                    .serviceUrl(arguments.serviceURL) //
-                    .connectionsPerBroker(arguments.maxConnections) //
-                    .ioThreads(arguments.ioThreads) //
-                    .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
-                    .enableBusyWait(arguments.enableBusyWait)
-                    .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
-
-            if (isNotBlank(arguments.authPluginClassName)) {
-                clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
-            }
 
-            if (arguments.tlsAllowInsecureConnection != null) {
-                clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
-            }
-
-            if (isNotBlank(arguments.listenerName)) {
-                clientBuilder.listenerName(arguments.listenerName);
-            }
+            ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+                    .enableTransaction(arguments.isEnableTransaction);
 
             client = clientBuilder.build();
             ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 1c99dce8c32..5245f634d1b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.testclient;
 
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -27,7 +25,6 @@ import com.beust.jcommander.Parameters;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.util.concurrent.RateLimiter;
-import java.io.FileInputStream;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.List;
@@ -43,7 +40,6 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.ReaderListener;
-import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -64,13 +60,8 @@ public class PerformanceReader {
     private static Recorder cumulativeRecorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
 
     @Parameters(commandDescription = "Test pulsar reader performance.")
-    static class Arguments {
+    static class Arguments extends PerformanceBaseArguments {
 
-        @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
-        boolean help;
-
-        @Parameter(names = { "-cf", "--conf-file" }, description = "Configuration file")
-        public String confFile;
 
         @Parameter(description = "persistent://prop/ns/my-topic", required = true)
         public List<String> topic;
@@ -95,54 +86,20 @@ public class PerformanceReader {
                 + "it will keep consuming")
         public long numMessages = 0;
 
-        @Parameter(names = { "-c",
-                "--max-connections" }, description = "Max number of TCP connections to a single broker")
-        public int maxConnections = 1;
-
-        @Parameter(names = { "-i",
-                "--stats-interval-seconds" },
-                description = "Statistics Interval Seconds. If 0, statistics will be disabled")
-        public long statsIntervalSeconds = 0;
-
-        @Parameter(names = { "-u", "--service-url" }, description = "Pulsar Service URL")
-        public String serviceURL;
-
-        @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
-        public String authPluginClassName;
-
-        @Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
-        String listenerName = null;
-
-        @Parameter(
-            names = { "--auth-params" },
-            description = "Authentication parameters, whose format is determined by the implementation "
-                    + "of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" "
-                    + "or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
-        public String authParams;
-
         @Parameter(names = {
                 "--use-tls" }, description = "Use TLS encryption on the connection")
         public boolean useTls;
 
-        @Parameter(names = {
-                "--trust-cert-file" }, description = "Path for the trusted TLS certificate file")
-        public String tlsTrustCertsFilePath = "";
-
-        @Parameter(names = {
-                "--tls-allow-insecure" }, description = "Allow insecure TLS connection")
-        public Boolean tlsAllowInsecureConnection = null;
-
         @Parameter(names = { "-time",
                 "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming")
         public long testTime = 0;
 
-        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be "
-                + "used for handling connections to brokers, default is 1 thread")
-        public int ioThreads = 1;
-
-        @Parameter(names = {"-lt", "--num-listener-threads"}, description = "Set the number of threads"
-                + " to be used for message listeners")
-        public int listenerThreads = 1;
+        @Override
+        public void fillArgumentsFromProperties(Properties prop) {
+            if (!useTls) {
+                useTls = Boolean.parseBoolean(prop.getProperty("useTls"));
+            }
+        }
     }
 
     public static void main(String[] args) throws Exception {
@@ -178,45 +135,7 @@ public class PerformanceReader {
                 PerfClientUtils.exit(-1);
             }
         }
-
-        if (arguments.confFile != null) {
-            Properties prop = new Properties(System.getProperties());
-            prop.load(new FileInputStream(arguments.confFile));
-
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("brokerServiceUrl");
-            }
-
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("webServiceUrl");
-            }
-
-            // fallback to previous-version serviceUrl property to maintain backward-compatibility
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
-            }
-
-            if (arguments.authPluginClassName == null) {
-                arguments.authPluginClassName = prop.getProperty("authPlugin", null);
-            }
-
-            if (arguments.authParams == null) {
-                arguments.authParams = prop.getProperty("authParams", null);
-            }
-
-            if (!arguments.useTls) {
-                arguments.useTls = Boolean.parseBoolean(prop.getProperty("useTls"));
-            }
-
-            if (isBlank(arguments.tlsTrustCertsFilePath)) {
-                arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
-            }
-
-            if (arguments.tlsAllowInsecureConnection == null) {
-                arguments.tlsAllowInsecureConnection = Boolean.parseBoolean(prop
-                        .getProperty("tlsAllowInsecureConnection", ""));
-            }
-        }
+        arguments.fillArgumentsFromProperties();
 
         // Dump config variables
         PerfClientUtils.printJVMInformation(log);
@@ -257,27 +176,8 @@ public class PerformanceReader {
             }
         };
 
-        ClientBuilder clientBuilder = PulsarClient.builder() //
-                .memoryLimit(0, SizeUnit.BYTES)
-                .serviceUrl(arguments.serviceURL) //
-                .connectionsPerBroker(arguments.maxConnections) //
-                .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
-                .ioThreads(arguments.ioThreads) //
-                .listenerThreads(arguments.listenerThreads)
-                .enableTls(arguments.useTls) //
-                .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
-
-        if (isNotBlank(arguments.authPluginClassName)) {
-            clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
-        }
-
-        if (arguments.tlsAllowInsecureConnection != null) {
-            clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
-        }
-
-        if (isNotBlank(arguments.listenerName)) {
-            clientBuilder.listenerName(arguments.listenerName);
-        }
+        ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+                .enableTls(arguments.useTls);
 
         PulsarClient pulsarClient = clientBuilder.build();
 
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 1258918b18a..1be3644021b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -19,14 +19,12 @@
 package org.apache.pulsar.testclient;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
@@ -62,7 +60,6 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -97,13 +94,7 @@ public class PerformanceTransaction {
 
 
     @Parameters(commandDescription = "Test pulsar transaction performance.")
-    static class Arguments {
-
-        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
-        boolean help;
-
-        @Parameter(names = { "-cf", "--conf-file" }, description = "Configuration file")
-        public String confFile;
+    static class Arguments  extends PerformanceBaseArguments {
 
         @Parameter(names = "--topics-c", description = "All topics that need ack for a transaction", required =
                 true)
@@ -120,39 +111,18 @@ public class PerformanceTransaction {
                 + "thereby increasing the intensity of the stress test.")
         public int numTestThreads = 1;
 
-        @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
-        public String authPluginClassName;
-
-        @Parameter(
-                names = { "--auth-params" },
-                description = "Authentication parameters, whose format is determined by the implementation "
-                        + "of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" "
-                        + "or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
-        public String authParams;
-
         @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
         public String adminURL;
 
-        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
-        public String serviceURL;
-
         @Parameter(names = {"-np",
                 "--partitions"}, description = "Create partitioned topics with a given number of partitions, 0 means"
                 + "not trying to create a topic")
         public Integer partitions = null;
 
-        @Parameter(names = {"-c",
-                "--max-connections"}, description = "Max number of TCP connections to a single broker")
-        public int maxConnections = 1;
-
         @Parameter(names = {"-time",
                 "--test-duration"}, description = "Test duration (in second). 0 means keeping publishing")
         public long testTime = 0;
 
-        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be "
-                + "used for handling connections to brokers. The default value is 1.")
-        public int ioThreads = 1;
-
         @Parameter(names = {"-ss",
                 "--subscriptions"}, description = "A list of subscriptions to consume (for example, sub1,sub2)")
         public List<String> subscriptions = Collections.singletonList("sub");
@@ -198,6 +168,16 @@ public class PerformanceTransaction {
 
         @Parameter(names = "-txnRate", description = "Set the rate of opened transaction or task. 0 means no limit")
         public int openTxnRate = 0;
+
+        @Override
+        public void fillArgumentsFromProperties(Properties prop) {
+            if (adminURL == null) {
+                adminURL = prop.getProperty("webServiceUrl");
+            }
+            if (adminURL == null) {
+                adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
+            }
+        }
     }
 
     public static void main(String[] args)
@@ -218,40 +198,7 @@ public class PerformanceTransaction {
             jc.usage();
             PerfClientUtils.exit(-1);
         }
-
-        if (arguments.confFile != null) {
-            Properties prop = new Properties(System.getProperties());
-            prop.load(new FileInputStream(arguments.confFile));
-
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("brokerServiceUrl");
-            }
-
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("webServiceUrl");
-            }
-
-            // fallback to previous-version serviceUrl property to maintain backward-compatibility
-            if (arguments.serviceURL == null) {
-                arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
-            }
-
-            if (arguments.adminURL == null) {
-                arguments.adminURL = prop.getProperty("webServiceUrl");
-            }
-            if (arguments.adminURL == null) {
-                arguments.adminURL = prop.getProperty("adminURL", "http://localhost:8080/");
-            }
-
-            if (arguments.authPluginClassName == null) {
-                arguments.authPluginClassName = prop.getProperty("authPlugin", null);
-            }
-
-            if (arguments.authParams == null) {
-                arguments.authParams = prop.getProperty("authParams", null);
-            }
-        }
-
+        arguments.fillArgumentsFromProperties();
 
         // Dump config variables
         PerfClientUtils.printJVMInformation(log);
@@ -266,24 +213,20 @@ public class PerformanceTransaction {
             payloadBytes[i] = (byte) (random.nextInt(26) + 65);
         }
         if (arguments.partitions != null) {
-            PulsarAdminBuilder clientBuilder = PulsarAdmin.builder()
-                    .serviceHttpUrl(arguments.adminURL);
+            final PulsarAdminBuilder adminBuilder = PerfClientUtils
+                    .createAdminBuilderFromArguments(arguments, arguments.adminURL);
 
-            if (isNotBlank(arguments.authPluginClassName)) {
-                clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
-            }
-
-            try (PulsarAdmin client = clientBuilder.build()) {
+            try (PulsarAdmin adminClient = adminBuilder.build()) {
                 for (String topic : arguments.producerTopic) {
                     log.info("Creating  produce partitioned topic {} with {} partitions", topic, arguments.partitions);
                     try {
-                        client.topics().createPartitionedTopic(topic, arguments.partitions);
+                        adminClient.topics().createPartitionedTopic(topic, arguments.partitions);
                     } catch (PulsarAdminException.ConflictException alreadyExists) {
                         if (log.isDebugEnabled()) {
                             log.debug("Topic {} already exists: {}", topic, alreadyExists);
                         }
                         PartitionedTopicMetadata partitionedTopicMetadata =
-                                client.topics().getPartitionedTopicMetadata(topic);
+                                adminClient.topics().getPartitionedTopicMetadata(topic);
                         if (partitionedTopicMetadata.partitions != arguments.partitions) {
                             log.error(
                                     "Topic {} already exists but it has a wrong number of partitions: {}, expecting {}",
@@ -295,18 +238,8 @@ public class PerformanceTransaction {
             }
         }
 
-        ClientBuilder clientBuilder =
-                PulsarClient.builder()
-                        .memoryLimit(0, SizeUnit.BYTES)
-                        .enableTransaction(!arguments.isDisableTransaction)
-                        .serviceUrl(arguments.serviceURL)
-                        .connectionsPerBroker(arguments.maxConnections)
-                        .statsInterval(0, TimeUnit.SECONDS)
-                        .ioThreads(arguments.ioThreads);
-
-        if (isNotBlank(arguments.authPluginClassName)) {
-            clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
-        }
+        ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+                        .enableTransaction(!arguments.isDisableTransaction);
 
         PulsarClient client = clientBuilder.build();
 
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerfClientUtilsTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerfClientUtilsTest.java
new file mode 100644
index 00000000000..78dbc1fff88
--- /dev/null
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerfClientUtilsTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class PerfClientUtilsTest {
+
+    public static class MyAuth implements Authentication {
+        @Override
+        public String getAuthMethodName() {
+            return null;
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+    }
+
+    @Test
+    public void testClientCreation() throws Exception {
+
+        final PerformanceBaseArguments args = new PerformanceBaseArguments() {
+            @Override
+            public void fillArgumentsFromProperties(Properties prop) {
+            }
+        };
+
+        args.tlsHostnameVerificationEnable = true;
+        args.authPluginClassName = MyAuth.class.getName();
+        args.authParams = "params";
+        args.enableBusyWait = true;
+        args.maxConnections = 100;
+        args.ioThreads = 16;
+        args.listenerName = "listener";
+        args.listenerThreads = 12;
+        args.statsIntervalSeconds = Long.MAX_VALUE;
+        args.serviceURL = "pulsar+ssl://my-pulsar:6651";
+        args.tlsTrustCertsFilePath = "path";
+        args.tlsAllowInsecureConnection = true;
+
+        final ClientBuilderImpl builder = (ClientBuilderImpl)PerfClientUtils.createClientBuilderFromArguments(args);
+        final ClientConfigurationData conf = builder.getClientConfigurationData();
+
+        Assert.assertTrue(conf.isTlsHostnameVerificationEnable());
+        Assert.assertEquals(conf.getAuthPluginClassName(), MyAuth.class.getName());
+        Assert.assertEquals(conf.getAuthParams(), "params");
+        Assert.assertTrue(conf.isEnableBusyWait());
+        Assert.assertEquals(conf.getConnectionsPerBroker(), 100);
+        Assert.assertEquals(conf.getNumIoThreads(), 16);
+        Assert.assertEquals(conf.getListenerName(), "listener");
+        Assert.assertEquals(conf.getNumListenerThreads(), 12);
+        Assert.assertEquals(conf.getStatsIntervalSeconds(), Long.MAX_VALUE);
+        Assert.assertEquals(conf.getServiceUrl(), "pulsar+ssl://my-pulsar:6651");
+        Assert.assertEquals(conf.getTlsTrustCertsFilePath(), "path");
+        Assert.assertTrue(conf.isTlsAllowInsecureConnection());
+
+    }
+}
\ No newline at end of file
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java
new file mode 100644
index 00000000000..0b244a5a4e1
--- /dev/null
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class PerformanceBaseArgumentsTest {
+
+    @Test
+    public void testReadFromConfigFile() {
+
+        AtomicBoolean called = new AtomicBoolean();
+
+        final PerformanceBaseArguments args = new PerformanceBaseArguments() {
+            @Override
+            public void fillArgumentsFromProperties(Properties prop) {
+                called.set(true);
+            }
+        };
+        args.confFile = "./src/test/resources/perf_client1.conf";
+        args.fillArgumentsFromProperties();
+        Assert.assertTrue(called.get());
+        Assert.assertEquals(args.serviceURL, "https://my-pulsar:8443/");
+        Assert.assertEquals(args.authPluginClassName,
+                "org.apache.pulsar.testclient.PerfClientUtilsTest.MyAuth");
+        Assert.assertEquals(args.authParams, "myparams");
+        Assert.assertEquals(args.tlsTrustCertsFilePath, "./path");
+        Assert.assertTrue(args.tlsAllowInsecureConnection);
+        Assert.assertTrue(args.tlsHostnameVerificationEnable);
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-testclient/src/test/resources/perf_client1.conf b/pulsar-testclient/src/test/resources/perf_client1.conf
new file mode 100644
index 00000000000..127960618bf
--- /dev/null
+++ b/pulsar-testclient/src/test/resources/perf_client1.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+brokerServiceUrl=https://my-pulsar:8443/
+authPlugin=org.apache.pulsar.testclient.PerfClientUtilsTest.MyAuth
+authParams=myparams
+tlsTrustCertsFilePath=./path
+tlsAllowInsecureConnection=true
+tlsEnableHostnameVerification=true
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index 0e533e10662..19a9f808e23 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -537,6 +537,24 @@ The table below lists the environment variables that you can use to configure th
 |`PULSAR_EXTRA_CLASSPATH`|Extra paths for Pulsar's classpath||
 |`PULSAR_GC_LOG`|Gc options to be passed to the jvm||
 
+Commands `consume`, `produce`, `read` and `transaction` share the following client options:
+
+|Flag|Description|Default|
+|---|---|---|
+|`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class. For example, `key1:val1,key2:val2` or `{"key1":"val1","key2":"val2"}`.||
+|`--auth-plugin`|Authentication plugin class name||
+|`-bw`, `--busy-wait`|Enable or disable Busy-Wait on the Pulsar client|false|
+|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
+|`-cf`, `--conf-file`|Configuration file||
+|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
+|`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
+|`--listener-name`|Listener name for the broker||
+|`-lt`, `--num-listener-threads`|Set the number of threads to be used for message listeners|1|
+|`--tls-allow-insecure`|Allow insecure TLS connection||
+|`--tls-hostname-verification`|Enable TLS hostname verification||
+|`--trust-cert-file`|Path for the trusted TLS certificate file||
+|`-u`, `--service-url`|Pulsar service URL||
+
 
 ### `consume`
 Run a consumer
@@ -553,23 +571,14 @@ Options
 
 |Flag|Description|Default|
 |---|---|---|
-|`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class. For example, `key1:val1,key2:val2` or `{"key1":"val1","key2":"val2"}`.||
-|`--auth-plugin`|Authentication plugin class name||
 |`-ac`, `--auto_ack_chunk_q_full`|Auto ack for the oldest message in consumer's receiver queue if the queue full|false|
-|`--listener-name`|Listener name for the broker||
 |`--acks-delay-millis`|Acknowledgements grouping delay in millis|100|
 |`--batch-index-ack`|Enable or disable the batch index acknowledgment|false|
-|`-bw`, `--busy-wait`|Enable or disable Busy-Wait on the Pulsar client|false|
 |`-v`, `--encryption-key-value-file`|The file which contains the private key to decrypt payload||
-|`-h`, `--help`|Help message|false|
-|`-cf`, `--conf-file`|Configuration file||
 |`-m`, `--num-messages`|Number of messages to consume in total. If the value is equal to or smaller than 0, it keeps consuming messages.|0|
 |`-e`, `--expire_time_incomplete_chunked_messages`|The expiration time for incomplete chunk messages (in milliseconds)|0|
-|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
 |`-mc`, `--max_chunked_msg`|Max pending chunk messages|0|
 |`-n`, `--num-consumers`|Number of consumers (per topic)|1|
-|`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
-|`-lt`, `--num-listener-threads`|Set the number of threads to be used for message listeners|1|
 |`-ns`, `--num-subscriptions`|Number of subscriptions (per topic)|1|
 |`-t`, `--num-topics`|The number of topics|1|
 |`-pm`, `--pool-messages`|Use the pooled message|true|
@@ -577,15 +586,12 @@ Options
 |`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
 |`-p`, `--receiver-queue-size-across-partitions`|Max total size of the receiver queue across partitions|50000|
 |`--replicated`|Whether the subscription status should be replicated|false|
-|`-u`, `--service-url`|Pulsar service URL||
-|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
 |`-s`, `--subscriber-name`|Subscriber name prefix||
 |`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub|
 |`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
 |`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|
 |`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0|
-|`--trust-cert-file`|Path for the trusted TLS certificate file||
-|`--tls-allow-insecure`|Allow insecure TLS connection||
+
 
 Below are **transaction** related options.
 
@@ -616,29 +622,22 @@ Options
 |---|---|---|
 |`-am`, `--access-mode`|Producer access mode. Valid values are `Shared`, `Exclusive` and `WaitForExclusive`|Shared|
 |`-au`, `--admin-url`|Pulsar admin URL||
-|`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class. For example, `key1:val1,key2:val2` or `{"key1":"val1","key2":"val2"}`.||
-|`--auth-plugin`|Authentication plugin class name||
-|`--listener-name`|Listener name for the broker||
 |`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
 |`-bb`, `--batch-max-bytes`|Maximum number of bytes per batch|4194304|
 |`-bm`, `--batch-max-messages`|Maximum number of messages per batch|1000|
-|`-bw`, `--busy-wait`|Enable or disable Busy-Wait on the Pulsar client|false|
 |`-ch`, `--chunking`|Split the message and publish in chunks if the message size is larger than allowed max size|false|
 |`-d`, `--delay`|Mark messages with a given delay in seconds|0s|
 |`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB, ZSTD or SNAPPY.||
-|`-cf`, `--conf-file`|Configuration file||
 |`-k`, `--encryption-key-name`|The public key name to encrypt payload||
 |`-v`, `--encryption-key-value-file`|The file which contains the public key to encrypt payload||
 |`-ef`, `--exit-on-failure`|Exit from the process on publish failure|false|
 |`-fc`, `--format-class`|Custom Formatter class name|org.apache.pulsar.testclient.DefaultMessageFormatter|
 |`-fp`, `--format-payload`|Format %i as a message index in the stream from producer and/or %t as the timestamp nanoseconds|false|
 |`-h`, `--help`|Help message|false|
-|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
 |`-o`, `--max-outstanding`|Max number of outstanding messages|1000|
 |`-p`, `--max-outstanding-across-partitions`|Max number of outstanding messages across partitions|50000|
 |`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
 |`-mk`, `--message-key-generation-mode`|The generation mode of message key. Valid options are `autoIncrement`, `random`||
-|`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
 |`-n`, `--num-producers`|The number of producers (per topic)|1|
 |`-threads`, `--num-test-threads`|Number of test threads|1|
 |`-t`, `--num-topic`|The number of topics|1|
@@ -649,13 +648,9 @@ Options
 |`-r`, `--rate`|Publish rate msg/s across topics|100|
 |`--send-timeout`|Set the sendTimeout|0|
 |`--separator`|Separator between the topic and topic number|-|
-|`-u`, `--service-url`|Pulsar service URL||
 |`-s`, `--size`|Message size (in bytes)|1024|
-|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
 |`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|
-|`--trust-cert-file`|Path for the trusted TLS certificate file||
 |`--warmup-time`|Warm-up time in seconds|1|
-|`--tls-allow-insecure`|Allow insecure TLS connection||
 
 Below are **transaction** related options.
 
@@ -683,25 +678,14 @@ Options
 
 |Flag|Description|Default|
 |---|---|---|
-|`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class. For example, `key1:val1,key2:val2` or `{"key1":"val1","key2":"val2"}`.||
-|`--auth-plugin`|Authentication plugin class name||
-|`--listener-name`|Listener name for the broker||
-|`-cf`, `--conf-file`|Configuration file||
 |`-h`, `--help`|Help message|false|
 |`-n`, `--num-messages`|Number of messages to consume in total. If the value is equal to or smaller than 0, it keeps consuming messages.|0|
-|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
-|`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
-|`-lt`, `--num-listener-threads`|Set the number of threads to be used for message listeners|1|
 |`-t`, `--num-topics`|The number of topics|1|
 |`-r`, `--rate`|Simulate a slow message reader (rate in msg/s)|0|
 |`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
-|`-u`, `--service-url`|Pulsar service URL||
 |`-m`, `--start-message-id`|Start message id. This can be either 'earliest', 'latest' or a specific message id by using 'lid:eid'|earliest|
-|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
 |`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0|
-|`--trust-cert-file`|Path for the trusted TLS certificate file||
 |`--use-tls`|Use TLS encryption on the connection|false|
-|`--tls-allow-insecure`|Allow insecure TLS connection||
 
 ### `websocket-producer`
 Run a websocket producer
@@ -839,13 +823,8 @@ $ pulsar-perf transaction options
 
 |Flag|Description|Default|
 |---|---|---|
-`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class. For example, `key1:val1,key2:val2` or `{"key1":"val1","key2":"val2"}`.|N/A
-`--auth-plugin`|Authentication plugin class name.|N/A
 `-au`, `--admin-url`|Pulsar admin URL.|N/A
-`-cf`, `--conf-file`|Configuration file.|N/A
 `-h`, `--help`|Help messages.|N/A
-`-c`, `--max-connections`|Maximum number of TCP connections to a single broker.|100
-`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers. |1
 `-ns`, `--num-subscriptions`|Number of subscriptions per topic.|1
 `-threads`, `--num-test-threads`|Number of test threads. <br /><br />This thread is for a new transaction to ack messages from consumer topics, produce messages to producer topics, and commit or abort this transaction. <br /><br /> Increasing the number of threads increases the parallelism of the performance test, consequently, it increases the intensity of the stress test.|1
 `-nmc`, `--numMessage-perTransaction-consume`|Set the number of messages consumed in a transaction. <br /><br /> If transaction is disabled, it means the number of messages consumed in a task instead of in a transaction.|1
@@ -853,7 +832,6 @@ $ pulsar-perf transaction options
 `-ntxn`, `--number-txn`|Set the number of transactions. <br /><br /> 0 means the number of transactions is unlimited. <br /><br /> If transaction is disabled, it means the number of tasks instead of transactions. |0
 `-np`, `--partitions`|Create partitioned topics with a given number of partitions. <br /><br /> 0 means not trying to create a topic.
 `-q`, `--receiver-queue-size`|Size of the receiver queue.|1000
-`-u`, `--service-url`|Pulsar service URL.|N/A
 `-sp`, `--subscription-position`|Subscription position.|Earliest
 `-st`, `--subscription-type`|Subscription type.|Shared
 `-ss`, `--subscriptions`|A list of subscriptions to consume. <br /><br /> For example, sub1,sub2.|[sub]