You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/06/23 16:32:24 UTC
[pulsar] branch branch-2.10 updated: [fix][txn] Ack the same batch message different batchIndex with transaction (#16032)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new bfbe53009b1 [fix][txn] Ack the same batch message different batchIndex with transaction (#16032)
bfbe53009b1 is described below
commit bfbe53009b1e8bd1b50301576a3360bb1bfe1c83
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jun 15 21:38:45 2022 +0800
[fix][txn] Ack the same batch message different batchIndex with transaction (#16032)
(cherry picked from commit 0a846ad5d61df7d4842cbcb78d7dfaa86156fa73)
---
.../pendingack/impl/PendingAckHandleImpl.java | 14 ++++++
.../pulsar/broker/transaction/TransactionTest.java | 53 ++++++++++++++++++++++
2 files changed, 67 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 323a1414e43..9f4d49f3f59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -23,6 +23,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWit
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -769,6 +770,19 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
if (!individualAckPositions.containsKey(position)) {
+ /**
+ * if the position does not exist in individualAckPositions {@link individualAckPositions},
+ * should new the same position and put the new position into
+ * the individualAckPositions {@link individualAckPositions}
+ * because when another ack the same batch message will change the ackSet with the new transaction
+ * when the tc commits the first txn will ack all of the ackSet which has in pending ack status
+ * individualAckPositions{@link individualAckPositions} can't include the same position
+ * object on individualAckOfTransaction {@link individualAckOfTransaction}
+ */
+ MutablePair<PositionImpl, Integer> positionPair = positions.get(i);
+ positionPair.left = PositionImpl.get(positionPair.getLeft().getLedgerId(),
+ positionPair.getLeft().getEntryId(),
+ Arrays.copyOf(positionPair.left.getAckSet(), positionPair.left.getAckSet().length));
this.individualAckPositions.put(position, positions.get(i));
} else {
MutablePair<PositionImpl, Integer> positionPair =
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index a885f3527ed..0b5cab28540 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -992,4 +992,57 @@ public class TransactionTest extends TransactionTestBase {
transaction.commit().get();
}
+
+ @Test
+ public void testPendingAckBatchMessageCommit() throws Exception {
+ String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit";
+
+ // enable batch index ack
+ conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient
+ .newProducer(Schema.BYTES)
+ .topic(topic)
+ .enableBatching(true)
+ // ensure that batch message is sent
+ .batchingMaxPublishDelay(3, TimeUnit.SECONDS)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient
+ .newConsumer()
+ .subscriptionType(SubscriptionType.Shared)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscribe();
+
+ // send batch message, the size is 5
+ for (int i = 0; i < 5; i++) {
+ producer.sendAsync(("test" + i).getBytes());
+ }
+
+ producer.flush();
+
+ Transaction txn1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
+ // ack the first message with transaction
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn1).get();
+ Transaction txn2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
+ // ack the second message with transaction
+ MessageId messageId = consumer.receive().getMessageId();
+ consumer.acknowledgeAsync(messageId, txn2).get();
+
+ // commit the txn1
+ txn1.commit().get();
+ // abort the txn2
+ txn2.abort().get();
+
+ Transaction txn3 = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
+ // repeat ack the second message, can ack successful
+ consumer.acknowledgeAsync(messageId, txn3).get();
+ }
}