You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 13:04:22 UTC

[pulsar] 18/26: [fix][tools] Only apply maxPendingMessagesAcrossPartitions if it presents (#15283)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bdb620369b8ff419c5d27dbb36d530bef472c2fc
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Apr 24 21:01:23 2022 +0800

    [fix][tools] Only apply maxPendingMessagesAcrossPartitions if it presents (#15283)
    
    (cherry picked from commit 188d4f4942e549e101757a73aa8785f2f3a2dbd4)
---
 .../pulsar/testclient/PerformanceProducer.java      |  4 +++-
 .../pulsar/testclient/PerformanceProducerTest.java  | 21 +++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index d297eb7d7bd..f18f4a84e13 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -597,10 +597,12 @@ public class PerformanceProducer {
                     .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) //
                     .compressionType(arguments.compression) //
                     .maxPendingMessages(arguments.maxOutstanding) //
-                    .maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions)
                     .accessMode(arguments.producerAccessMode)
                     // enable round robin message routing if it is a partitioned topic
                     .messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+            if (arguments.maxPendingMessagesAcrossPartitions > 0) {
+                producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions);
+            }
 
             AtomicReference<Transaction> transactionAtomicReference;
             if (arguments.isEnableTransaction) {
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
index 50174ed4b70..99b615678da 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
@@ -190,4 +190,25 @@ public class PerformanceProducerTest extends MockedPulsarServiceBaseTest {
         Assert.assertTrue(msgFormatter instanceof DefaultMessageFormatter);
     }
 
+    @Test
+    public void testMaxOutstanding() throws Exception {
+        String argString = "%s -r 10 -u %s -au %s -m 5 -o 10000";
+        String topic = testTopic + UUID.randomUUID().toString();
+        String args = String.format(argString, topic, pulsar.getBrokerServiceUrl(), pulsar.getWebServiceAddress());
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Key_Shared).subscribe();
+        new Thread(() -> {
+            try {
+                PerformanceProducer.main(args.split(" "));
+            } catch (Exception e) {
+                log.error("Failed to start perf producer");
+            }
+        }).start();
+        Awaitility.await()
+                .untilAsserted(() -> {
+                    Message<byte[]> message = consumer.receive(3, TimeUnit.SECONDS);
+                    assertNotNull(message);
+                });
+        consumer.close();
+    }
 }