You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/01 06:44:49 UTC

[pulsar] branch branch-2.11 updated: [fix][txn] fix ack with txn compute ackedCount error (#17016)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new d9cd5f4d18f [fix][txn] fix ack with txn compute ackedCount error (#17016)
d9cd5f4d18f is described below

commit d9cd5f4d18fcae429626fc726b54bc19dd07d455
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 1 12:29:04 2022 +0800

    [fix][txn] fix ack with txn compute ackedCount error (#17016)
    
    Co-authored-by: congbobo184 <co...@github.com>
---
 .../org/apache/pulsar/broker/service/Consumer.java | 25 ++++++------
 .../client/impl/TransactionEndToEndTest.java       | 46 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 85df9ff107c..8dfed3e36e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -531,25 +531,28 @@ public class Consumer {
         LongAdder totalAckCount = new LongAdder();
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
-            PositionImpl position;
+            PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
+            // acked count at least one
             long ackedCount = 0;
-            long batchSize = getBatchSize(msgId);
+            long batchSize = 0;
+            if (msgId.hasBatchSize()) {
+                batchSize = msgId.getBatchSize();
+                // ack batch messages set ackeCount = batchSize
+                ackedCount = msgId.getBatchSize();
+                positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
+            } else {
+                // ack no batch message set ackedCount = 1
+                ackedCount = 1;
+                positionsAcked.add(new MutablePair<>(position, (int) batchSize));
+            }
             Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
             if (msgId.getAckSetsCount() > 0) {
                 long[] ackSets = new long[msgId.getAckSetsCount()];
                 for (int j = 0; j < msgId.getAckSetsCount(); j++) {
                     ackSets[j] = msgId.getAckSetAt(j);
                 }
-                position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
+                position.setAckSet(ackSets);
                 ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
-            } else {
-                position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
-                ackedCount = batchSize;
-            }
-            if (msgId.hasBatchSize()) {
-                positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
-            } else {
-                positionsAcked.add(new MutablePair<>(position, (int) batchSize));
             }
 
             addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 62406410b0b..37d9eb6967d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1172,6 +1172,52 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exception {
+        final String topic = "persistent://" + NAMESPACE1 + "/testAckWithTransactionReduceUnackCountNotInPendingAcks";
+        final String subName = "test";
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .sendTimeout(1, TimeUnit.SECONDS)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subName)
+                .subscribe();
+
+        // send 5 messages with one batch
+        for (int i = 0; i < 5; i++) {
+            producer.sendAsync((i + "").getBytes(UTF_8));
+        }
+
+        List<MessageId> messageIds = new ArrayList<>();
+
+        // receive the batch messages add to a list
+        for (int i = 0; i < 5; i++) {
+            messageIds.add(consumer.receive().getMessageId());
+        }
+
+        MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
+
+
+        // remove the message from the pendingAcks, in fact redeliver will remove the messageId from the pendingAck
+        getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
+                .get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks()
+                .remove(messageId.ledgerId, messageId.entryId);
+
+        Transaction txn = getTxn();
+        consumer.acknowledgeAsync(messageIds.get(1), txn).get();
+
+        // ack one message, the unack count is 4
+        assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
+                .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4);
+    }
+
     @Test
     public void testSendTxnAckMessageToDLQ() throws Exception {
         String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ";