You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/13 02:29:32 UTC
[pulsar] branch master updated: [fix][broker] fix broker unackmessages number reduce error (#17003)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5262e6c8b4d [fix][broker] fix broker unackmessages number reduce error (#17003)
5262e6c8b4d is described below
commit 5262e6c8b4d2a98ac7f73a94a30f001630b2be28
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Aug 13 10:29:26 2022 +0800
[fix][broker] fix broker unackmessages number reduce error (#17003)
---
.../org/apache/pulsar/broker/service/Consumer.java | 13 ++--
.../BatchMessageWithBatchIndexLevelTest.java | 82 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 5c7646921fb..20f3d3f74d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -474,7 +474,7 @@ public class Consumer {
ackSets[j] = msgId.getAckSetAt(j);
}
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
- ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets);
+ ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, ackOwnerConsumer);
if (isTransactionEnabled()) {
//sync the batch position bit set point, in order to delete the position in pending acks
if (Subscription.isIndividualAckMode(subType)) {
@@ -484,7 +484,7 @@ public class Consumer {
}
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
- ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position);
+ ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
}
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
@@ -592,20 +592,21 @@ public class Consumer {
return batchSize;
}
- private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position) {
+ private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position, Consumer consumer) {
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
- return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET);
+ return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET, consumer);
}
}
return batchSize;
}
- private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) {
+ private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets,
+ Consumer consumer) {
long ackedCount = 0;
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)
- && pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) {
+ && consumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()) != null) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index b953772ad67..d5c4e1eb064 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -33,11 +33,13 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
@Test(groups = "broker")
public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
@@ -195,4 +197,84 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
assertEquals(unackedMessages, 10);
});
}
+
+ @Test
+ public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws Exception {
+ final String subName = "test";
+ final String topicName = "persistent://prop/ns-abc/testAckMessageWithNotOwnerConsumerUnAckMessageCount-"
+ + UUID.randomUUID();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient
+ .newProducer()
+ .topic(topicName)
+ .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+ .enableBatching(true)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient
+ .newConsumer()
+ .topic(topicName)
+ .consumerName("consumer-1")
+ .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+ .isAckReceiptEnabled(true)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ @Cleanup
+ Consumer<byte[]> consumer2 = pulsarClient
+ .newConsumer()
+ .topic(topicName)
+ .consumerName("consumer-2")
+ .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+ .isAckReceiptEnabled(true)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().value(("Hello Pulsar - " + i).getBytes()).sendAsync();
+ }
+
+ // consume-1 receive 5 batch messages
+ List<MessageId> list = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ list.add(consumer1.receive().getMessageId());
+ }
+
+ // consumer-1 redeliver the batch messages
+ consumer1.negativeAcknowledge(list.get(0));
+
+ // consumer-2 will receive the messages that the consumer-1 redelivered
+ for (int i = 0; i < 5; i++) {
+ consumer2.receive().getMessageId();
+ }
+
+ // consumer1 ack two messages in the batch message
+ consumer1.acknowledge(list.get(1));
+ consumer1.acknowledge(list.get(2));
+
+ // consumer-2 redeliver the rest of the messages
+ consumer2.negativeAcknowledge(list.get(1));
+
+ // consume-1 close will redeliver the rest messages to consumer-2
+ consumer1.close();
+
+ // consumer-2 can receive the rest of 3 messages
+ for (int i = 0; i < 3; i++) {
+ consumer2.acknowledge(consumer2.receive().getMessageId());
+ }
+
+ // consumer-2 can't receive any messages, all the messages in batch has been acked
+ Message<byte[]> message = consumer2.receive(1, TimeUnit.SECONDS);
+ assertNull(message);
+
+ // the number of consumer-2's unacked messages is 0
+ Awaitility.await().until(() -> getPulsar().getBrokerService().getTopic(topicName, false)
+ .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages() == 0);
+ }
}