You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/25 04:20:55 UTC
[pulsar] 07/12: [fix][broker] Fix consumer does not abide by the max unacks limitation for Shared subscription (#16670)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8840591ea8189f414f0fbd403dc59c68d9dd7e4e
Author: Penghui Li <pe...@apache.org>
AuthorDate: Wed Jul 20 10:51:35 2022 +0800
[fix][broker] Fix consumer does not abide by the max unacks limitation for Shared subscription (#16670)
(cherry picked from commit 42fe0603518be7db7a14802eb4274b6ea22b0c9a)
---
.../PersistentDispatcherMultipleConsumers.java | 3 ++
.../client/api/SimpleProducerConsumerTest.java | 40 ++++++++++++++++++++++
.../client/api/v1/V1_ProducerConsumerTest.java | 2 +-
3 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 33105ac5a3a..a17f0533a10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -557,6 +557,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
// round-robin dispatch batch size for this consumer
int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
+ if (c.getMaxUnackedMessages() > 0) {
+ availablePermits = Math.min(availablePermits, c.getMaxUnackedMessages() - c.getUnackedMessages());
+ }
if (log.isDebugEnabled() && !c.isWritable()) {
log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; "
+ "availablePermits are {}", topic.getName(), name,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index ac0186c3c5a..c90aa577961 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -1590,6 +1590,46 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
}
+ @Test(dataProvider = "ackReceiptEnabled")
+ public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException {
+ final int maxUnacks = 10;
+ pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks);
+ final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits";
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic).subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(ackReceiptEnabled)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false)
+ .topic(topic)
+ .create();
+
+ final int messages = 1000;
+ for (int i = 0; i < messages; i++) {
+ producer.sendAsync("Message - " + i);
+ }
+ producer.flush();
+ List<MessageId> receives = new ArrayList<>();
+ for (int i = 0; i < maxUnacks; i++) {
+ Message<String> received = consumer.receive();
+ log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId());
+ receives.add(received.getMessageId());
+ }
+ assertNull(consumer.receive(3, TimeUnit.SECONDS));
+ consumer.acknowledge(receives);
+ for (int i = 0; i < messages - maxUnacks; i++) {
+ Message<String> received = consumer.receive();
+ log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId());
+ consumer.acknowledge(received);
+ }
+ }
+
/**
* Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
*
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 55c120592e1..e4cb941c650 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -1708,7 +1708,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
}
// client should not receive all produced messages and should be blocked due to unack-messages
- assertEquals(messages1.size(), receiverQueueSize);
+ assertEquals(messages1.size(), unAckedMessagesBufferSize);
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m -> {
return (MessageIdImpl) m.getMessageId();
}).collect(Collectors.toSet());