You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/09/23 02:15:03 UTC
[pulsar] branch master updated: [testclient] Improve parameter
checking in pulsar-perf (#11973)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9516e5d [testclient] Improve parameter checking in pulsar-perf (#11973)
9516e5d is described below
commit 9516e5db1a250d39101d0c7acda6d1fd8366c8d3
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Thu Sep 23 10:14:11 2021 +0800
[testclient] Improve parameter checking in pulsar-perf (#11973)
---
.../apache/pulsar/client/api/ClientBuilder.java | 2 +-
.../pulsar/client/impl/ClientBuilderImpl.java | 6 ++++
.../pulsar/client/impl/ReaderBuilderImpl.java | 1 +
.../client/impl/conf/ClientConfigurationData.java | 1 +
.../proxy/socket/client/PerformanceClient.java | 7 +++--
.../pulsar/testclient/ManagedLedgerWriter.java | 8 +++---
.../pulsar/testclient/PerformanceConsumer.java | 8 +++---
.../pulsar/testclient/PerformanceProducer.java | 12 ++++----
.../pulsar/testclient/PerformanceReader.java | 4 +--
.../PositiveNumberParameterValidator.java | 32 ++++++++++++++++++++++
site2/docs/reference-cli-tools.md | 14 ++++++----
11 files changed, 69 insertions(+), 26 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 42a63d3..690dfc5 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -263,7 +263,7 @@ public interface ClientBuilder extends Serializable, Cloneable {
* Increasing this parameter may improve throughput when using many producers over a high latency connection.
*
* @param connectionsPerBroker
- * max number of connections per broker (needs to be greater than 0)
+ * max number of connections per broker (needs to be greater than or equal to 0)
* @return the client builder instance
*/
ClientBuilder connectionsPerBroker(int connectionsPerBroker);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index c8a4376..2a0542e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -37,6 +37,8 @@ import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
+import static com.google.common.base.Preconditions.checkArgument;
+
public class ClientBuilderImpl implements ClientBuilder {
ClientConfigurationData conf;
@@ -140,6 +142,7 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) {
+ checkArgument(operationTimeout >= 0, "operationTimeout needs to be >= 0");
conf.setOperationTimeoutMs(unit.toMillis(operationTimeout));
return this;
}
@@ -152,18 +155,21 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder ioThreads(int numIoThreads) {
+ checkArgument(numIoThreads > 0, "ioThreads needs to be > 0");
conf.setNumIoThreads(numIoThreads);
return this;
}
@Override
public ClientBuilder listenerThreads(int numListenerThreads) {
+ checkArgument(numListenerThreads > 0, "listenerThreads needs to be > 0");
conf.setNumListenerThreads(numListenerThreads);
return this;
}
@Override
public ClientBuilder connectionsPerBroker(int connectionsPerBroker) {
+ checkArgument(connectionsPerBroker >= 0, "connectionsPerBroker needs to be >= 0");
conf.setConnectionsPerBroker(connectionsPerBroker);
return this;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 4305bb6..dfcba2f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -174,6 +174,7 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
@Override
public ReaderBuilder<T> receiverQueueSize(int receiverQueueSize) {
+ checkArgument(receiverQueueSize >= 0, "receiverQueueSize needs to be >= 0");
conf.setReceiverQueueSize(receiverQueueSize);
return this;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 60b3370..799e61c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -122,6 +122,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
@ApiModelProperty(
name = "connectionsPerBroker",
value = "Number of connections established between the client and each Broker."
+ + "A value of 0 means to disable connection pooling."
)
private int connectionsPerBroker = 1;
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 782a3a3..8ef5580 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.testclient.PositiveNumberParameterValidator;
import org.apache.pulsar.testclient.PerfClientUtils;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
@@ -92,7 +93,7 @@ public class PerformanceClient {
@Parameter(names = { "-s", "--size" }, description = "Message size in byte")
public int msgSize = 1024;
- @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics")
+ @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
public int numTopics = 1;
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
@@ -106,14 +107,14 @@ public class PerformanceClient {
public String authParams;
@Parameter(names = { "-m",
- "--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing")
+ "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
public long numMessages = 0;
@Parameter(names = { "-f", "--payload-file" }, description = "Use payload from a file instead of empty buffer")
public String payloadFilename = null;
@Parameter(names = { "-time",
- "--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
+ "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing")
public long testTime = 0;
}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 464c59e..0bb31c2 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -90,10 +90,10 @@ public class ManagedLedgerWriter {
@Parameter(names = { "-s", "--size" }, description = "Message size")
public int msgSize = 1024;
- @Parameter(names = { "-t", "--num-topic" }, description = "Number of managed ledgers")
+ @Parameter(names = { "-t", "--num-topic" }, description = "Number of managed ledgers", validateWith = PositiveNumberParameterValidator.class)
public int numManagedLedgers = 1;
- @Parameter(names = { "--threads" }, description = "Number of threads writing")
+ @Parameter(names = { "--threads" }, description = "Number of threads writing", validateWith = PositiveNumberParameterValidator.class)
public int numThreads = 1;
@Parameter(names = { "-zk", "--zookeeperServers" }, description = "ZooKeeper connection string", required = true)
@@ -107,7 +107,7 @@ public class ManagedLedgerWriter {
public int maxConnections = 1;
@Parameter(names = { "-m",
- "--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing")
+ "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
public long numMessages = 0;
@Parameter(names = { "-e", "--ensemble-size" }, description = "Ledger ensemble size")
@@ -123,7 +123,7 @@ public class ManagedLedgerWriter {
public DigestType digestType = DigestType.CRC32C;
@Parameter(names = { "-time",
- "--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
+ "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing")
public long testTime = 0;
}
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 3b8973e..e9315dc 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -82,13 +82,13 @@ public class PerformanceConsumer {
@Parameter(description = "persistent://prop/ns/my-topic", required = true)
public List<String> topic;
- @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics")
+ @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
public int numTopics = 1;
- @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive")
+ @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive", validateWith = PositiveNumberParameterValidator.class)
public int numConsumers = 1;
- @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)")
+ @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)", validateWith = PositiveNumberParameterValidator.class)
public int numSubscriptions = 1;
@Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix", hidden = true)
@@ -170,7 +170,7 @@ public class PerformanceConsumer {
public String encKeyFile = null;
@Parameter(names = { "-time",
- "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
+ "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming")
public long testTime = 0;
@Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 208f3a8..cf830d1 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -108,7 +108,7 @@ public class PerformanceProducer {
@Parameter(description = "persistent://prop/ns/my-topic", required = true)
public List<String> topics;
- @Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads")
+ @Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads", validateWith = PositiveNumberParameterValidator.class)
public int numTestThreads = 1;
@Parameter(names = { "-r", "--rate" }, description = "Publish rate msg/s across topics")
@@ -117,10 +117,10 @@ public class PerformanceProducer {
@Parameter(names = { "-s", "--size" }, description = "Message size (bytes)")
public int msgSize = 1024;
- @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics")
+ @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
public int numTopics = 1;
- @Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)")
+ @Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)", validateWith = PositiveNumberParameterValidator.class)
public int numProducers = 1;
@Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
@@ -169,7 +169,7 @@ public class PerformanceProducer {
public int maxConnections = 100;
@Parameter(names = { "-m",
- "--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing")
+ "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
public long numMessages = 0;
@Parameter(names = { "-i",
@@ -201,7 +201,7 @@ public class PerformanceProducer {
public int batchMaxBytes = 4 * 1024 * 1024;
@Parameter(names = { "-time",
- "--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
+ "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing")
public long testTime = 0;
@Parameter(names = "--warmup-time", description = "Warm-up time in seconds (Default: 1 sec)")
@@ -538,7 +538,7 @@ public class PerformanceProducer {
producerBuilder.producerName(producerName);
}
- if (arguments.batchTimeMillis == 0.0 && arguments.batchMaxMessages == 0) {
+ if (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0) {
producerBuilder.enableBatching(false);
} else {
long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 91d840e..150c7f1 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -72,7 +72,7 @@ public class PerformanceReader {
@Parameter(description = "persistent://prop/ns/my-topic", required = true)
public List<String> topic;
- @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics")
+ @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
public int numTopics = 1;
@Parameter(names = { "-r", "--rate" }, description = "Simulate a slow message reader (rate in msg/s)")
@@ -126,7 +126,7 @@ public class PerformanceReader {
public Boolean tlsAllowInsecureConnection = null;
@Parameter(names = { "-time",
- "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
+ "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming")
public long testTime = 0;
@Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PositiveNumberParameterValidator.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PositiveNumberParameterValidator.java
new file mode 100644
index 0000000..523d9c1
--- /dev/null
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PositiveNumberParameterValidator.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.beust.jcommander.IParameterValidator;
+import com.beust.jcommander.ParameterException;
+
+public class PositiveNumberParameterValidator implements IParameterValidator {
+
+ @Override
+ public void validate(String name, String value) throws ParameterException {
+ if (Integer.parseInt(value) <= 0) {
+ throw new ParameterException("Parameter " + name + " should be > 0 (found " + value + ")");
+ }
+ }
+}
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index cc8b20a..9dba486 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -449,6 +449,7 @@ Options
|`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub|
|`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
|`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|
+|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0|
|`--trust-cert-file`|Path for the trusted TLS certificate file||
|`--tls-allow-insecure`|Allow insecure TLS connection||
@@ -478,7 +479,7 @@ Options
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-o`, `--max-outstanding`|Max number of outstanding messages|1000|
|`-p`, `--max-outstanding-across-partitions`|Max number of outstanding messages across partitions|50000|
-|`-m`, `--num-messages`|Number of messages to publish in total. If set to 0, it will keep publishing.|0|
+|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
|`-n`, `--num-producers`|The number of producers (per topic)|1|
|`-t`, `--num-topic`|The number of topics|1|
|`-f`, `--payload-file`|Use payload from an UTF-8 encoded text file and a payload will be randomly selected when publishing messages||
@@ -487,7 +488,7 @@ Options
|`-u`, `--service-url`|Pulsar service URL||
|`-s`, `--size`|Message size (in bytes)|1024|
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
-|`-time`, `--test-duration`|Test duration in secs. If set to 0, it will keep publishing.|0|
+|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|
|`--trust-cert-file`|Path for the trusted TLS certificate file||
|`--warmup-time`|Warm-up time in seconds|1|
|`--tls-allow-insecure`|Allow insecure TLS connection||
@@ -517,6 +518,7 @@ Options
|`-u`, `--service-url`|Pulsar service URL||
|`-m`, `--start-message-id`|Start message id. This can be either 'earliest', 'latest' or a specific message id by using 'lid:eid'|earliest|
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
+|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0|
|`--trust-cert-file`|Path for the trusted TLS certificate file||
|`--use-tls`|Use TLS encryption on the connection|false|
|`--tls-allow-insecure`|Allow insecure TLS connection||
@@ -536,13 +538,13 @@ Options
|`--auth_plugin`|Authentication plugin class name||
|`--conf-file`|Configuration file||
|`-h`, `--help`|Help message|false|
-|`-m`, `--num-messages`|Number of messages to publish in total. If 0, it will keep publishing|0|
+|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
|`-t`, `--num-topic`|The number of topics|1|
|`-f`, `--payload-file`|Use payload from a file instead of empty buffer||
|`-u`, `--proxy-url`|Pulsar Proxy URL, e.g., "ws://localhost:8080/"||
|`-r`, `--rate`|Publish rate msg/s across topics|100|
|`-s`, `--size`|Message size in byte|1024|
-|`-time`, `--test-duration`|Test duration in secs. If 0, it will keep publishing|0|
+|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|
### `managed-ledger`
@@ -562,11 +564,11 @@ Options
|`-h`, `--help`|Help message|false|
|`-c`, `--max-connections`|Max number of TCP connections to a single bookie|1|
|`-o`, `--max-outstanding`|Max number of outstanding requests|1000|
-|`-m`, `--num-messages`|Number of messages to publish in total. If 0, it will keep publishing|0|
+|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
|`-t`, `--num-topic`|Number of managed ledgers|1|
|`-r`, `--rate`|Write rate msg/s across managed ledgers|100|
|`-s`, `--size`|Message size in byte|1024|
-|`-time`, `--test-duration`|Test duration in secs. If 0, it will keep publishing|0|
+|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|
|`--threads`|Number of threads writing|1|
|`-w`, `--write-quorum`|Ledger write quorum|1|
|`-zk`, `--zookeeperServers`|ZooKeeper connection string||