You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/06/23 16:31:15 UTC

[pulsar] branch branch-2.9 updated: [fix][txn] Ack the same batch message different batchIndex with transaction (#16032)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 741fd6c748c [fix][txn] Ack the same batch message different batchIndex with transaction (#16032)
741fd6c748c is described below

commit 741fd6c748c4c799ff51bd125ed5bc34fff6f1c7
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jun 15 21:38:45 2022 +0800

    [fix][txn] Ack the same batch message different batchIndex with transaction (#16032)
    
    (cherry picked from commit 0a846ad5d61df7d4842cbcb78d7dfaa86156fa73)
---
 .../pendingack/impl/PendingAckHandleImpl.java      | 14 ++++++
 .../pulsar/broker/transaction/TransactionTest.java | 53 ++++++++++++++++++++++
 2 files changed, 67 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 4fdc298f303..5b808f1dedb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -22,6 +22,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -767,6 +768,19 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                 }
 
                 if (!individualAckPositions.containsKey(position)) {
+                    /**
+                     *  if the position does not exist in individualAckPositions {@link individualAckPositions},
+                     *  should new the same position and put the new position into
+                     *  the individualAckPositions {@link individualAckPositions}
+                     *  because when another ack the same batch message will change the ackSet with the new transaction
+                     *  when the tc commits the first txn will ack all of the ackSet which has in pending ack status
+                     *  individualAckPositions{@link individualAckPositions} can't include the same position
+                     *  object on individualAckOfTransaction {@link individualAckOfTransaction}
+                     */
+                    MutablePair<PositionImpl, Integer> positionPair = positions.get(i);
+                    positionPair.left = PositionImpl.get(positionPair.getLeft().getLedgerId(),
+                            positionPair.getLeft().getEntryId(),
+                            Arrays.copyOf(positionPair.left.getAckSet(), positionPair.left.getAckSet().length));
                     this.individualAckPositions.put(position, positions.get(i));
                 } else {
                     MutablePair<PositionImpl, Integer> positionPair =
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 0b59eda4523..10aa86aed2c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -996,4 +996,57 @@ public class TransactionTest extends TransactionTestBase {
 
         transaction.commit().get();
     }
+
+    @Test
+    public void testPendingAckBatchMessageCommit() throws Exception {
+        String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit";
+
+        // enable batch index ack
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer(Schema.BYTES)
+                .topic(topic)
+                .enableBatching(true)
+                // ensure that batch message is sent
+                .batchingMaxPublishDelay(3, TimeUnit.SECONDS)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .subscriptionType(SubscriptionType.Shared)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+
+        // send batch message, the size is 5
+        for (int i = 0; i < 5; i++) {
+            producer.sendAsync(("test" + i).getBytes());
+        }
+
+        producer.flush();
+
+        Transaction txn1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
+        // ack the first message with transaction
+        consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn1).get();
+        Transaction txn2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
+        // ack the second message with transaction
+        MessageId messageId = consumer.receive().getMessageId();
+        consumer.acknowledgeAsync(messageId, txn2).get();
+
+        // commit the txn1
+        txn1.commit().get();
+        // abort the txn2
+        txn2.abort().get();
+
+        Transaction txn3 = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
+        // repeat ack the second message, can ack successful
+        consumer.acknowledgeAsync(messageId, txn3).get();
+    }
 }