You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/25 04:20:55 UTC

[pulsar] 07/12: [fix][broker] Fix consumer does not abide by the max unacks limitation for Shared subscription (#16670)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8840591ea8189f414f0fbd403dc59c68d9dd7e4e
Author: Penghui Li <pe...@apache.org>
AuthorDate: Wed Jul 20 10:51:35 2022 +0800

    [fix][broker] Fix consumer does not abide by the max unacks limitation for Shared subscription (#16670)
    
    (cherry picked from commit 42fe0603518be7db7a14802eb4274b6ea22b0c9a)
---
 .../PersistentDispatcherMultipleConsumers.java     |  3 ++
 .../client/api/SimpleProducerConsumerTest.java     | 40 ++++++++++++++++++++++
 .../client/api/v1/V1_ProducerConsumerTest.java     |  2 +-
 3 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 33105ac5a3a..a17f0533a10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -557,6 +557,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
             // round-robin dispatch batch size for this consumer
             int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
+            if (c.getMaxUnackedMessages() > 0) {
+                availablePermits = Math.min(availablePermits, c.getMaxUnackedMessages() - c.getUnackedMessages());
+            }
             if (log.isDebugEnabled() && !c.isWritable()) {
                 log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; "
                                 + "availablePermits are {}", topic.getName(), name,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index ac0186c3c5a..c90aa577961 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -1590,6 +1590,46 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException {
+        final int maxUnacks = 10;
+        pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks);
+        final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits";
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic).subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(ackReceiptEnabled)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false)
+                .topic(topic)
+                .create();
+
+        final int messages = 1000;
+        for (int i = 0; i < messages; i++) {
+            producer.sendAsync("Message - " + i);
+        }
+        producer.flush();
+        List<MessageId> receives = new ArrayList<>();
+        for (int i = 0; i < maxUnacks; i++) {
+            Message<String> received =  consumer.receive();
+            log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId());
+            receives.add(received.getMessageId());
+        }
+        assertNull(consumer.receive(3, TimeUnit.SECONDS));
+        consumer.acknowledge(receives);
+        for (int i = 0; i < messages - maxUnacks; i++) {
+            Message<String> received =  consumer.receive();
+            log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId());
+            consumer.acknowledge(received);
+        }
+    }
+
     /**
      * Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
      *
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 55c120592e1..e4cb941c650 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -1708,7 +1708,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             }
 
             // client should not receive all produced messages and should be blocked due to unack-messages
-            assertEquals(messages1.size(), receiverQueueSize);
+            assertEquals(messages1.size(), unAckedMessagesBufferSize);
             Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m -> {
                 return (MessageIdImpl) m.getMessageId();
             }).collect(Collectors.toSet());