You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/08/21 09:10:42 UTC
[pulsar] branch branch-3.1 updated: [fix][broker] Fix incorrect unack msk count when dup ack a message (#20990)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 01c46556cb9 [fix][broker] Fix incorrect unack msk count when dup ack a message (#20990)
01c46556cb9 is described below
commit 01c46556cb95027591ac854cfab0630d6b64874d
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Aug 18 22:54:44 2023 +0800
[fix][broker] Fix incorrect unack msk count when dup ack a message (#20990)
---
.../org/apache/pulsar/broker/service/Consumer.java | 24 +++++++++++++-------
.../pulsar/broker/service/BrokerServiceTest.java | 26 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 176f033a6dc..d138eab3758 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -487,6 +487,7 @@ public class Consumer {
private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
long totalAckCount = 0;
+ boolean individualAck = false;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
@@ -510,14 +511,18 @@ public class Consumer {
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
+ individualAck = true;
}
- addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
-
+ if (individualAck) {
+ if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
+ addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+ }
+ } else {
+ addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+ }
positionsAcked.add(position);
- checkCanRemovePendingAcksAndHandle(position, msgId);
-
checkAckValidationError(ack, position);
totalAckCount += ackedCount;
@@ -679,10 +684,11 @@ public class Consumer {
}
}
- private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) {
+ private boolean checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) {
if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) {
- removePendingAcks(position);
+ return removePendingAcks(position);
}
+ return false;
}
private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
@@ -949,7 +955,7 @@ public class Consumer {
*
* @param position
*/
- private void removePendingAcks(PositionImpl position) {
+ private boolean removePendingAcks(PositionImpl position) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
for (Consumer consumer : subscription.getConsumers()) {
@@ -970,7 +976,7 @@ public class Consumer {
if (ackedPosition != null) {
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
- return;
+ return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
@@ -984,7 +990,9 @@ public class Consumer {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
+ return true;
}
+ return false;
}
public ConcurrentLongLongPairHashMap getPendingAcks() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 6497daa81dc..c61da7fc03b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1732,4 +1732,30 @@ public class BrokerServiceTest extends BrokerTestBase {
assertTrue(brokerService.isAllowAutoTopicCreationAsync(
"persistent://pulsar/system/my-system-topic").get());
}
+
+ @Test
+ public void testDuplicateAcknowledgement() throws Exception {
+ final String ns = "prop/ns-test";
+
+ admin.namespaces().createNamespace(ns, 2);
+ final String topicName = "persistent://prop/ns-test/duplicated-acknowledgement-test";
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName("sub-1")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(true)
+ .subscribe();
+ producer.send("1".getBytes(StandardCharsets.UTF_8));
+ Message<byte[]> message = consumer1.receive();
+ consumer1.acknowledge(message);
+ consumer1.acknowledge(message);
+ assertEquals(admin.topics().getStats(topicName).getSubscriptions()
+ .get("sub-1").getUnackedMessages(), 0);
+ }
}