You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/18 09:31:24 UTC

[pulsar] 01/12: Enable CLI to publish non-batched messages (#12641)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 97c79e20be5e291d6de03daecd2507d96e0b27f9
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Mon Nov 8 20:19:02 2021 +0900

    Enable CLI to publish non-batched messages (#12641)
    
    ### Motivation
    
    Currently, messages produced by the `pulsar-client` command are always batched. However, zero queue consumers cannot receive these batched messages. I think it would be useful to be able to easily produce non-batched messages.
    
    ### Modifications
    
    Added an option to disable batching to the `pulsar-client` command:
    ```sh
    $ ./bin/pulsar-client produce -m hello -n 10 --disable-batching persistent://public/default/t1
    ```
    
    (cherry picked from commit a1bad71728d0b39e5b38fdd4ea9a7578629ac975)
---
 .../pulsar/client/cli/PulsarClientToolTest.java    | 46 ++++++++++++++++++++--
 .../org/apache/pulsar/client/cli/CmdProduce.java   |  5 +++
 site2/docs/reference-cli-tools.md                  |  1 +
 3 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index fcc6810..52dcf96 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -32,6 +32,9 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -114,7 +117,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
         properties.setProperty("serviceUrl", brokerUrl.toString());
         properties.setProperty("useTls", "false");
 
-        final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
+        final String topicName = getTopicWithRandomSuffix("non-durable");
 
         int numberOfMessages = 10;
         @Cleanup("shutdownNow")
@@ -169,7 +172,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
         properties.setProperty("serviceUrl", brokerUrl.toString());
         properties.setProperty("useTls", "false");
 
-        final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
+        final String topicName = getTopicWithRandomSuffix("durable");
 
         int numberOfMessages = 10;
         @Cleanup("shutdownNow")
@@ -219,7 +222,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
         properties.setProperty("serviceUrl", brokerUrl.toString());
         properties.setProperty("useTls", "false");
 
-        final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
+        final String topicName = getTopicWithRandomSuffix("encryption");
         final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/";
         final int numberOfMessages = 10;
 
@@ -262,4 +265,41 @@ public class PulsarClientToolTest extends BrokerTestBase {
         }
     }
 
+    @Test(timeOut = 20000)
+    public void testDisableBatching() throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("useTls", "false");
+
+        final String topicName = getTopicWithRandomSuffix("disable-batching");
+        final int numberOfMessages = 5;
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
+
+        PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties);
+        String[] args1 = {"produce", "-m", "batched", "-n", Integer.toString(numberOfMessages), topicName};
+        Assert.assertEquals(pulsarClientTool1.run(args1), 0);
+
+        PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties);
+        String[] args2 = {"produce", "-m", "non-batched", "-n", Integer.toString(numberOfMessages), "-db", topicName};
+        Assert.assertEquals(pulsarClientTool2.run(args2), 0);
+
+        for (int i = 0; i < numberOfMessages * 2; i++) {
+            Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+            Assert.assertNotNull(msg);
+            if (i < numberOfMessages) {
+                Assert.assertEquals(new String(msg.getData()), "batched");
+                Assert.assertTrue(msg.getMessageId() instanceof BatchMessageIdImpl);
+            } else {
+                Assert.assertEquals(new String(msg.getData()), "non-batched");
+                Assert.assertFalse(msg.getMessageId() instanceof BatchMessageIdImpl);
+            }
+        }
+    }
+
+    private static String getTopicWithRandomSuffix(String localNameBase) {
+        return String.format("persistent://prop/ns-abc/test/%s-%s", localNameBase, UUID.randomUUID().toString());
+    }
+
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 326e221..e1d6aca 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -98,6 +98,9 @@ public class CmdProduce {
                description = "Rate (in msg/sec) at which to produce," +
                        " value 0 means to produce messages as fast as possible.")
     private double publishRate = 0;
+
+    @Parameter(names = { "-db", "--disable-batching" }, description = "Disable batch sending of messages")
+    private boolean disableBatching = false;
     
     @Parameter(names = { "-c",
             "--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
@@ -216,6 +219,8 @@ public class CmdProduce {
             if (this.chunkingAllowed) {
                 producerBuilder.enableChunking(true);
                 producerBuilder.enableBatching(false);
+            } else if (this.disableBatching) {
+                producerBuilder.enableBatching(false);
             }
             if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyValue)) {
                 producerBuilder.addEncryptionKey(this.encKeyName);
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index 04b323a..6ef73df 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -316,6 +316,7 @@ Options
 |`-m`, `--messages`|Comma-separated string of messages to send; either -m or -f must be specified|[]|
 |`-n`, `--num-produce`|The number of times to send the message(s); the count of messages/files * num-produce should be below 1000|1|
 |`-r`, `--rate`|Rate (in messages per second) at which to produce; a value 0 means to produce messages as fast as possible|0.0|
+|`-db`, `--disable-batching`|Disable batch sending of messages|false|
 |`-c`, `--chunking`|Split the message and publish in chunks if the message size is larger than the allowed max size|false|
 |`-s`, `--separator`|Character to split messages string with.|","|
 |`-k`, `--key`|Message key to add|key=value string, like k1=v1,k2=v2.|