You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 13:04:22 UTC
[pulsar] 18/26: [fix][tools] Only apply maxPendingMessagesAcrossPartitions if it presents (#15283)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bdb620369b8ff419c5d27dbb36d530bef472c2fc
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Apr 24 21:01:23 2022 +0800
[fix][tools] Only apply maxPendingMessagesAcrossPartitions if it presents (#15283)
(cherry picked from commit 188d4f4942e549e101757a73aa8785f2f3a2dbd4)
---
.../pulsar/testclient/PerformanceProducer.java | 4 +++-
.../pulsar/testclient/PerformanceProducerTest.java | 21 +++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index d297eb7d7bd..f18f4a84e13 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -597,10 +597,12 @@ public class PerformanceProducer {
.sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) //
.compressionType(arguments.compression) //
.maxPendingMessages(arguments.maxOutstanding) //
- .maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions)
.accessMode(arguments.producerAccessMode)
// enable round robin message routing if it is a partitioned topic
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+ if (arguments.maxPendingMessagesAcrossPartitions > 0) {
+ producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions);
+ }
AtomicReference<Transaction> transactionAtomicReference;
if (arguments.isEnableTransaction) {
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
index 50174ed4b70..99b615678da 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
@@ -190,4 +190,25 @@ public class PerformanceProducerTest extends MockedPulsarServiceBaseTest {
Assert.assertTrue(msgFormatter instanceof DefaultMessageFormatter);
}
+ @Test
+ public void testMaxOutstanding() throws Exception {
+ String argString = "%s -r 10 -u %s -au %s -m 5 -o 10000";
+ String topic = testTopic + UUID.randomUUID().toString();
+ String args = String.format(argString, topic, pulsar.getBrokerServiceUrl(), pulsar.getWebServiceAddress());
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Key_Shared).subscribe();
+ new Thread(() -> {
+ try {
+ PerformanceProducer.main(args.split(" "));
+ } catch (Exception e) {
+ log.error("Failed to start perf producer");
+ }
+ }).start();
+ Awaitility.await()
+ .untilAsserted(() -> {
+ Message<byte[]> message = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(message);
+ });
+ consumer.close();
+ }
}