You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/08 11:19:50 UTC
[pulsar] branch master updated: Enable CLI to publish non-batched
messages (#12641)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a1bad71 Enable CLI to publish non-batched messages (#12641)
a1bad71 is described below
commit a1bad71728d0b39e5b38fdd4ea9a7578629ac975
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Mon Nov 8 20:19:02 2021 +0900
Enable CLI to publish non-batched messages (#12641)
### Motivation
Currently, messages produced by the `pulsar-client` command are always batched. However, zero queue consumers cannot receive these batched messages. I think it would be useful to be able to easily produce non-batched messages.
### Modifications
Added an option to disable batching to the `pulsar-client` command:
```sh
$ ./bin/pulsar-client produce -m hello -n 10 --disable-batching persistent://public/default/t1
```
---
.../pulsar/client/cli/PulsarClientToolTest.java | 46 ++++++++++++++++++++--
.../org/apache/pulsar/client/cli/CmdProduce.java | 5 +++
site2/docs/reference-cli-tools.md | 1 +
3 files changed, 49 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index d563089..b1e0670 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -33,6 +33,9 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -106,7 +109,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
- final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
+ final String topicName = getTopicWithRandomSuffix("non-durable");
int numberOfMessages = 10;
@Cleanup("shutdownNow")
@@ -155,7 +158,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
- final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
+ final String topicName = getTopicWithRandomSuffix("durable");
int numberOfMessages = 10;
@Cleanup("shutdownNow")
@@ -197,7 +200,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
- final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
+ final String topicName = getTopicWithRandomSuffix("encryption");
final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/";
final int numberOfMessages = 10;
@@ -234,4 +237,41 @@ public class PulsarClientToolTest extends BrokerTestBase {
}
}
+ @Test(timeOut = 20000)
+ public void testDisableBatching() throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("useTls", "false");
+
+ final String topicName = getTopicWithRandomSuffix("disable-batching");
+ final int numberOfMessages = 5;
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
+
+ PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties);
+ String[] args1 = {"produce", "-m", "batched", "-n", Integer.toString(numberOfMessages), topicName};
+ Assert.assertEquals(pulsarClientTool1.run(args1), 0);
+
+ PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties);
+ String[] args2 = {"produce", "-m", "non-batched", "-n", Integer.toString(numberOfMessages), "-db", topicName};
+ Assert.assertEquals(pulsarClientTool2.run(args2), 0);
+
+ for (int i = 0; i < numberOfMessages * 2; i++) {
+ Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg);
+ if (i < numberOfMessages) {
+ Assert.assertEquals(new String(msg.getData()), "batched");
+ Assert.assertTrue(msg.getMessageId() instanceof BatchMessageIdImpl);
+ } else {
+ Assert.assertEquals(new String(msg.getData()), "non-batched");
+ Assert.assertFalse(msg.getMessageId() instanceof BatchMessageIdImpl);
+ }
+ }
+ }
+
+ private static String getTopicWithRandomSuffix(String localNameBase) {
+ return String.format("persistent://prop/ns-abc/test/%s-%s", localNameBase, UUID.randomUUID().toString());
+ }
+
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 1ae2d38..0ec22ba 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -107,6 +107,9 @@ public class CmdProduce {
description = "Rate (in msg/sec) at which to produce," +
" value 0 means to produce messages as fast as possible.")
private double publishRate = 0;
+
+ @Parameter(names = { "-db", "--disable-batching" }, description = "Disable batch sending of messages")
+ private boolean disableBatching = false;
@Parameter(names = { "-c",
"--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
@@ -247,6 +250,8 @@ public class CmdProduce {
if (this.chunkingAllowed) {
producerBuilder.enableChunking(true);
producerBuilder.enableBatching(false);
+ } else if (this.disableBatching) {
+ producerBuilder.enableBatching(false);
}
if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyValue)) {
producerBuilder.addEncryptionKey(this.encKeyName);
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index 6eb7555..df41ffd 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -312,6 +312,7 @@ Options
|`-m`, `--messages`|Comma-separated string of messages to send; either -m or -f must be specified|[]|
|`-n`, `--num-produce`|The number of times to send the message(s); the count of messages/files * num-produce should be below 1000|1|
|`-r`, `--rate`|Rate (in messages per second) at which to produce; a value 0 means to produce messages as fast as possible|0.0|
+|`-db`, `--disable-batching`|Disable batch sending of messages|false|
|`-c`, `--chunking`|Split the message and publish in chunks if the message size is larger than the allowed max size|false|
|`-s`, `--separator`|Character to split messages string with.|","|
|`-k`, `--key`|Message key to add|key=value string, like k1=v1,k2=v2.|