You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/22 07:24:47 UTC

[pulsar] branch master updated: [fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription (#16718)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fd9c4181475 [fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription (#16718)
fd9c4181475 is described below

commit fd9c418147503ec226f2c87e187a58af6f601b7d
Author: Penghui Li <pe...@apache.org>
AuthorDate: Fri Jul 22 15:24:40 2022 +0800

    [fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription (#16718)
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java |  6 +++++-
 .../pulsar/client/api/SimpleProducerConsumerTest.java   | 17 ++++++++++++++---
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 0a402d8322a..60762d8400a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -236,7 +236,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             Consumer consumer = current.getKey();
             List<Entry> entriesWithSameKey = current.getValue();
             int entriesWithSameKeyCount = entriesWithSameKey.size();
-            final int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
+            int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
+            if (consumer != null && consumer.getMaxUnackedMessages() > 0) {
+                availablePermits = Math.min(availablePermits,
+                        consumer.getMaxUnackedMessages() - consumer.getUnackedMessages());
+            }
             int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
             int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC,
                     readType, consumerStickyKeyHashesMap.get(consumer));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 9af19cc672d..2f572a841b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -149,6 +149,16 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         return new Object[][] { { true }, { false } };
     }
 
+    @DataProvider(name = "ackReceiptEnabledAndSubscriptionTypes")
+    public Object[][] ackReceiptEnabledAndSubscriptionTypes() {
+        return new Object[][] {
+                {true, SubscriptionType.Shared},
+                {true, SubscriptionType.Key_Shared},
+                {false, SubscriptionType.Shared},
+                {false, SubscriptionType.Key_Shared},
+        };
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
@@ -1591,8 +1601,9 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
     }
 
-    @Test(dataProvider = "ackReceiptEnabled")
-    public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException {
+    @Test(dataProvider = "ackReceiptEnabledAndSubscriptionTypes")
+    public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled, SubscriptionType subType)
+            throws PulsarClientException {
         final int maxUnacks = 10;
         pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks);
         final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits";
@@ -1600,7 +1611,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic).subscriptionName("sub")
-                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionType(subType)
                 .isAckReceiptEnabled(ackReceiptEnabled)
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
                 .subscribe();