You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/22 07:24:47 UTC
[pulsar] branch master updated: [fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription (#16718)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fd9c4181475 [fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription (#16718)
fd9c4181475 is described below
commit fd9c418147503ec226f2c87e187a58af6f601b7d
Author: Penghui Li <pe...@apache.org>
AuthorDate: Fri Jul 22 15:24:40 2022 +0800
[fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription (#16718)
---
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 6 +++++-
.../pulsar/client/api/SimpleProducerConsumerTest.java | 17 ++++++++++++++---
2 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 0a402d8322a..60762d8400a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -236,7 +236,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
Consumer consumer = current.getKey();
List<Entry> entriesWithSameKey = current.getValue();
int entriesWithSameKeyCount = entriesWithSameKey.size();
- final int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
+ int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
+ if (consumer != null && consumer.getMaxUnackedMessages() > 0) {
+ availablePermits = Math.min(availablePermits,
+ consumer.getMaxUnackedMessages() - consumer.getUnackedMessages());
+ }
int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC,
readType, consumerStickyKeyHashesMap.get(consumer));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 9af19cc672d..2f572a841b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -149,6 +149,16 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
return new Object[][] { { true }, { false } };
}
+ @DataProvider(name = "ackReceiptEnabledAndSubscriptionTypes")
+ public Object[][] ackReceiptEnabledAndSubscriptionTypes() {
+ return new Object[][] {
+ {true, SubscriptionType.Shared},
+ {true, SubscriptionType.Key_Shared},
+ {false, SubscriptionType.Shared},
+ {false, SubscriptionType.Key_Shared},
+ };
+ }
+
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
@@ -1591,8 +1601,9 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
}
- @Test(dataProvider = "ackReceiptEnabled")
- public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException {
+ @Test(dataProvider = "ackReceiptEnabledAndSubscriptionTypes")
+ public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled, SubscriptionType subType)
+ throws PulsarClientException {
final int maxUnacks = 10;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks);
final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits";
@@ -1600,7 +1611,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic).subscriptionName("sub")
- .subscriptionType(SubscriptionType.Shared)
+ .subscriptionType(subType)
.isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();