You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/09/23 02:15:03 UTC

[pulsar] branch master updated: [testclient] Improve parameter checking in pulsar-perf (#11973)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9516e5d  [testclient] Improve parameter checking in pulsar-perf (#11973)
9516e5d is described below

commit 9516e5db1a250d39101d0c7acda6d1fd8366c8d3
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Thu Sep 23 10:14:11 2021 +0800

    [testclient] Improve parameter checking in pulsar-perf (#11973)
---
 .../apache/pulsar/client/api/ClientBuilder.java    |  2 +-
 .../pulsar/client/impl/ClientBuilderImpl.java      |  6 ++++
 .../pulsar/client/impl/ReaderBuilderImpl.java      |  1 +
 .../client/impl/conf/ClientConfigurationData.java  |  1 +
 .../proxy/socket/client/PerformanceClient.java     |  7 +++--
 .../pulsar/testclient/ManagedLedgerWriter.java     |  8 +++---
 .../pulsar/testclient/PerformanceConsumer.java     |  8 +++---
 .../pulsar/testclient/PerformanceProducer.java     | 12 ++++----
 .../pulsar/testclient/PerformanceReader.java       |  4 +--
 .../PositiveNumberParameterValidator.java          | 32 ++++++++++++++++++++++
 site2/docs/reference-cli-tools.md                  | 14 ++++++----
 11 files changed, 69 insertions(+), 26 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 42a63d3..690dfc5 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -263,7 +263,7 @@ public interface ClientBuilder extends Serializable, Cloneable {
      * Increasing this parameter may improve throughput when using many producers over a high latency connection.
      *
      * @param connectionsPerBroker
-     *            max number of connections per broker (needs to be greater than 0)
+     *            max number of connections per broker (needs to be greater than or equal to 0)
      * @return the client builder instance
      */
     ClientBuilder connectionsPerBroker(int connectionsPerBroker);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index c8a4376..2a0542e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -37,6 +37,8 @@ import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 public class ClientBuilderImpl implements ClientBuilder {
     ClientConfigurationData conf;
 
@@ -140,6 +142,7 @@ public class ClientBuilderImpl implements ClientBuilder {
 
     @Override
     public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) {
+        checkArgument(operationTimeout >= 0, "operationTimeout needs to be >= 0");
         conf.setOperationTimeoutMs(unit.toMillis(operationTimeout));
         return this;
     }
@@ -152,18 +155,21 @@ public class ClientBuilderImpl implements ClientBuilder {
 
     @Override
     public ClientBuilder ioThreads(int numIoThreads) {
+        checkArgument(numIoThreads > 0, "ioThreads needs to be > 0");
         conf.setNumIoThreads(numIoThreads);
         return this;
     }
 
     @Override
     public ClientBuilder listenerThreads(int numListenerThreads) {
+        checkArgument(numListenerThreads > 0, "listenerThreads needs to be > 0");
         conf.setNumListenerThreads(numListenerThreads);
         return this;
     }
 
     @Override
     public ClientBuilder connectionsPerBroker(int connectionsPerBroker) {
+        checkArgument(connectionsPerBroker >= 0, "connectionsPerBroker needs to be >= 0");
         conf.setConnectionsPerBroker(connectionsPerBroker);
         return this;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 4305bb6..dfcba2f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -174,6 +174,7 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
 
     @Override
     public ReaderBuilder<T> receiverQueueSize(int receiverQueueSize) {
+        checkArgument(receiverQueueSize >= 0, "receiverQueueSize needs to be >= 0");
         conf.setReceiverQueueSize(receiverQueueSize);
         return this;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 60b3370..799e61c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -122,6 +122,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     @ApiModelProperty(
             name = "connectionsPerBroker",
             value = "Number of connections established between the client and each Broker."
+                    + "A value of 0 means to disable connection pooling."
     )
     private int connectionsPerBroker = 1;
 
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 782a3a3..8ef5580 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.testclient.PositiveNumberParameterValidator;
 import org.apache.pulsar.testclient.PerfClientUtils;
 import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
@@ -92,7 +93,7 @@ public class PerformanceClient {
         @Parameter(names = { "-s", "--size" }, description = "Message size in byte")
         public int msgSize = 1024;
 
-        @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics")
+        @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
         public int numTopics = 1;
 
         @Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
@@ -106,14 +107,14 @@ public class PerformanceClient {
         public String authParams;
 
         @Parameter(names = { "-m",
-                "--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing")
+                "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
         public long numMessages = 0;
 
         @Parameter(names = { "-f", "--payload-file" }, description = "Use payload from a file instead of empty buffer")
         public String payloadFilename = null;
 
         @Parameter(names = { "-time",
-                "--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
+                "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing")
         public long testTime = 0;
     }
 
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 464c59e..0bb31c2 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -90,10 +90,10 @@ public class ManagedLedgerWriter {
         @Parameter(names = { "-s", "--size" }, description = "Message size")
         public int msgSize = 1024;
 
-        @Parameter(names = { "-t", "--num-topic" }, description = "Number of managed ledgers")
+        @Parameter(names = { "-t", "--num-topic" }, description = "Number of managed ledgers", validateWith = PositiveNumberParameterValidator.class)
         public int numManagedLedgers = 1;
 
-        @Parameter(names = { "--threads" }, description = "Number of threads writing")
+        @Parameter(names = { "--threads" }, description = "Number of threads writing", validateWith = PositiveNumberParameterValidator.class)
         public int numThreads = 1;
 
         @Parameter(names = { "-zk", "--zookeeperServers" }, description = "ZooKeeper connection string", required = true)
@@ -107,7 +107,7 @@ public class ManagedLedgerWriter {
         public int maxConnections = 1;
 
         @Parameter(names = { "-m",
-                "--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing")
+                "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
         public long numMessages = 0;
 
         @Parameter(names = { "-e", "--ensemble-size" }, description = "Ledger ensemble size")
@@ -123,7 +123,7 @@ public class ManagedLedgerWriter {
         public DigestType digestType = DigestType.CRC32C;
 
         @Parameter(names = { "-time",
-                "--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
+                "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing")
         public long testTime = 0;
 
     }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 3b8973e..e9315dc 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -82,13 +82,13 @@ public class PerformanceConsumer {
         @Parameter(description = "persistent://prop/ns/my-topic", required = true)
         public List<String> topic;
 
-        @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics")
+        @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
         public int numTopics = 1;
 
-        @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive")
+        @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive", validateWith = PositiveNumberParameterValidator.class)
         public int numConsumers = 1;
 
-        @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)")
+        @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)", validateWith = PositiveNumberParameterValidator.class)
         public int numSubscriptions = 1;
 
         @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix", hidden = true)
@@ -170,7 +170,7 @@ public class PerformanceConsumer {
         public String encKeyFile = null;
 
         @Parameter(names = { "-time",
-                "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
+                "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming")
         public long testTime = 0;
 
         @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 208f3a8..cf830d1 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -108,7 +108,7 @@ public class PerformanceProducer {
         @Parameter(description = "persistent://prop/ns/my-topic", required = true)
         public List<String> topics;
 
-        @Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads")
+        @Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads", validateWith = PositiveNumberParameterValidator.class)
         public int numTestThreads = 1;
 
         @Parameter(names = { "-r", "--rate" }, description = "Publish rate msg/s across topics")
@@ -117,10 +117,10 @@ public class PerformanceProducer {
         @Parameter(names = { "-s", "--size" }, description = "Message size (bytes)")
         public int msgSize = 1024;
 
-        @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics")
+        @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
         public int numTopics = 1;
 
-        @Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)")
+        @Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)", validateWith = PositiveNumberParameterValidator.class)
         public int numProducers = 1;
 
         @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
@@ -169,7 +169,7 @@ public class PerformanceProducer {
         public int maxConnections = 100;
 
         @Parameter(names = { "-m",
-                "--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing")
+                "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
         public long numMessages = 0;
 
         @Parameter(names = { "-i",
@@ -201,7 +201,7 @@ public class PerformanceProducer {
         public int batchMaxBytes = 4 * 1024 * 1024;
 
         @Parameter(names = { "-time",
-                "--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
+                "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing")
         public long testTime = 0;
 
         @Parameter(names = "--warmup-time", description = "Warm-up time in seconds (Default: 1 sec)")
@@ -538,7 +538,7 @@ public class PerformanceProducer {
                 producerBuilder.producerName(producerName);
             }
 
-            if (arguments.batchTimeMillis == 0.0 && arguments.batchMaxMessages == 0) {
+            if (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0) {
                 producerBuilder.enableBatching(false);
             } else {
                 long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 91d840e..150c7f1 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -72,7 +72,7 @@ public class PerformanceReader {
         @Parameter(description = "persistent://prop/ns/my-topic", required = true)
         public List<String> topic;
 
-        @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics")
+        @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
         public int numTopics = 1;
 
         @Parameter(names = { "-r", "--rate" }, description = "Simulate a slow message reader (rate in msg/s)")
@@ -126,7 +126,7 @@ public class PerformanceReader {
         public Boolean tlsAllowInsecureConnection = null;
 
         @Parameter(names = { "-time",
-                "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
+                "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming")
         public long testTime = 0;
 
         @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PositiveNumberParameterValidator.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PositiveNumberParameterValidator.java
new file mode 100644
index 0000000..523d9c1
--- /dev/null
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PositiveNumberParameterValidator.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.beust.jcommander.IParameterValidator;
+import com.beust.jcommander.ParameterException;
+
+public class PositiveNumberParameterValidator implements IParameterValidator {
+
+    @Override
+    public void validate(String name, String value) throws ParameterException {
+        if (Integer.parseInt(value) <= 0) {
+            throw new ParameterException("Parameter " + name + " should be > 0 (found " + value + ")");
+        }
+    }
+}
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index cc8b20a..9dba486 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -449,6 +449,7 @@ Options
 |`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub|
 |`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
 |`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|
+|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0|
 |`--trust-cert-file`|Path for the trusted TLS certificate file||
 |`--tls-allow-insecure`|Allow insecure TLS connection||
 
@@ -478,7 +479,7 @@ Options
 |`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
 |`-o`, `--max-outstanding`|Max number of outstanding messages|1000|
 |`-p`, `--max-outstanding-across-partitions`|Max number of outstanding messages across partitions|50000|
-|`-m`, `--num-messages`|Number of messages to publish in total. If set to 0, it will keep publishing.|0|
+|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
 |`-n`, `--num-producers`|The number of producers (per topic)|1|
 |`-t`, `--num-topic`|The number of topics|1|
 |`-f`, `--payload-file`|Use payload from an UTF-8 encoded text file and a payload will be randomly selected when publishing messages||
@@ -487,7 +488,7 @@ Options
 |`-u`, `--service-url`|Pulsar service URL||
 |`-s`, `--size`|Message size (in bytes)|1024|
 |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
-|`-time`, `--test-duration`|Test duration in secs. If set to 0, it will keep publishing.|0|
+|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|
 |`--trust-cert-file`|Path for the trusted TLS certificate file||
 |`--warmup-time`|Warm-up time in seconds|1|
 |`--tls-allow-insecure`|Allow insecure TLS connection||
@@ -517,6 +518,7 @@ Options
 |`-u`, `--service-url`|Pulsar service URL||
 |`-m`, `--start-message-id`|Start message id. This can be either 'earliest', 'latest' or a specific message id by using 'lid:eid'|earliest|
 |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
+|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0|
 |`--trust-cert-file`|Path for the trusted TLS certificate file||
 |`--use-tls`|Use TLS encryption on the connection|false|
 |`--tls-allow-insecure`|Allow insecure TLS connection||
@@ -536,13 +538,13 @@ Options
 |`--auth_plugin`|Authentication plugin class name||
 |`--conf-file`|Configuration file||
 |`-h`, `--help`|Help message|false|
-|`-m`, `--num-messages`|Number of messages to publish in total. If 0, it will keep publishing|0|
+|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
 |`-t`, `--num-topic`|The number of topics|1|
 |`-f`, `--payload-file`|Use payload from a file instead of empty buffer||
 |`-u`, `--proxy-url`|Pulsar Proxy URL, e.g., "ws://localhost:8080/"||
 |`-r`, `--rate`|Publish rate msg/s across topics|100|
 |`-s`, `--size`|Message size in byte|1024|
-|`-time`, `--test-duration`|Test duration in secs. If 0, it will keep publishing|0|
+|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|
 
 
 ### `managed-ledger`
@@ -562,11 +564,11 @@ Options
 |`-h`, `--help`|Help message|false|
 |`-c`, `--max-connections`|Max number of TCP connections to a single bookie|1|
 |`-o`, `--max-outstanding`|Max number of outstanding requests|1000|
-|`-m`, `--num-messages`|Number of messages to publish in total. If 0, it will keep publishing|0|
+|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
 |`-t`, `--num-topic`|Number of managed ledgers|1|
 |`-r`, `--rate`|Write rate msg/s across managed ledgers|100|
 |`-s`, `--size`|Message size in byte|1024|
-|`-time`, `--test-duration`|Test duration in secs. If 0, it will keep publishing|0|
+|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|
 |`--threads`|Number of threads writing|1|
 |`-w`, `--write-quorum`|Ledger write quorum|1|
 |`-zk`, `--zookeeperServers`|ZooKeeper connection string||