You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/05/13 05:48:48 UTC
[pulsar] branch master updated: [Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 498cde9ad3d [Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)
498cde9ad3d is described below
commit 498cde9ad3dd62142d73e024ea424bd76726dfaa
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri May 13 13:48:34 2022 +0800
[Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)
### Motivation
Now, PendingAckHandle use the ending transaction ID to append abort mark, but it is wrong. We should append abort mark for the first transaction in the individualAckOfTransaction after judgment.
### Modification
Append abort mark for the first transaction in the individualAckOfTransaction after judgment.
---
.../pendingack/impl/PendingAckHandleImpl.java | 22 ++--
.../pendingack/PendingAckPersistentTest.java | 117 +++++++++++++++++++++
2 files changed, 124 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index dc4f89d5e11..98b6700ca20 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -600,21 +600,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
if (firstTxn.getMostSigBits() == txnID.getMostSigBits()
&& firstTxn.getLeastSigBits() <= lowWaterMark) {
- this.pendingAckStoreFuture.whenComplete((pendingAckStore, throwable) -> {
- if (throwable == null) {
- pendingAckStore.appendAbortMark(txnID, AckType.Individual).thenAccept(v -> {
- synchronized (PendingAckHandleImpl.this) {
- log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
- + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
- individualAckOfTransaction.remove(firstTxn);
- handleLowWaterMark(txnID, lowWaterMark);
- }
- }).exceptionally(e -> {
- log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
- + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
- return null;
- });
- }
+ abortTxn(firstTxn, null, lowWaterMark).thenRun(() -> {
+ log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
+ + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
+ }).exceptionally(e -> {
+ log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
+ + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
+ return null;
});
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 58408cdee10..7bc562ca62b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -23,7 +23,9 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -32,6 +34,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -44,6 +47,8 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
@@ -345,4 +350,116 @@ public class PendingAckPersistentTest extends TransactionTestBase {
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName2)));
assertFalse(topics.contains(topic));
}
+
+ @Test
+ public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception {
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
+ NamespaceName.get(NAMESPACE1), "test").toString();
+
+ String subName = "subName";
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Failover)
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().send();
+ }
+
+ Transaction transaction1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+
+ Message<byte[]> message1 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message1.getMessageId(), transaction1);
+ transaction1.commit().get();
+
+
+ Transaction transaction2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ while (transaction1.getTxnID().getMostSigBits() != transaction2.getTxnID().getMostSigBits()) {
+ transaction2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ }
+
+ Transaction transaction3 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ while (transaction1.getTxnID().getMostSigBits() != transaction3.getTxnID().getMostSigBits()) {
+ transaction3 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ }
+
+ Message<byte[]> message3 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message3.getMessageId(), transaction2);
+ transaction2.commit().get();
+
+ Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
+
+ Field field = TransactionImpl.class.getDeclaredField("state");
+ field.setAccessible(true);
+ field.set(transaction1, TransactionImpl.State.OPEN);
+
+ consumer.acknowledgeAsync(message2.getMessageId(), transaction1).get();
+ Message<byte[]> message4 = consumer.receive(5, TimeUnit.SECONDS);
+ field.set(transaction2, TransactionImpl.State.OPEN);
+ consumer.acknowledgeAsync(message4.getMessageId(), transaction2).get();
+
+ Message<byte[]> message5 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message5.getMessageId(), transaction3);
+ transaction3.commit().get();
+
+
+ PersistentTopic persistentTopic =
+ (PersistentTopic) getPulsarServiceList()
+ .get(0)
+ .getBrokerService()
+ .getTopic(topic, false)
+ .get()
+ .get();
+
+ PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
+ PendingAckHandleImpl pendingAckHandle = new PendingAckHandleImpl(persistentSubscription);
+
+ Method method = PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore");
+ method.setAccessible(true);
+ method.invoke(pendingAckHandle);
+
+ Field field1 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
+ field1.setAccessible(true);
+ CompletableFuture<PendingAckStore> completableFuture =
+ (CompletableFuture<PendingAckStore>) field1.get(pendingAckHandle);
+
+ Awaitility.await().until(() -> {
+ completableFuture.get();
+ return true;
+ });
+
+ Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+ field2.setAccessible(true);
+ LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction =
+ (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle);
+
+ assertFalse(individualAckOfTransaction.containsKey(transaction1.getTxnID()));
+ assertFalse(individualAckOfTransaction.containsKey(transaction2.getTxnID()));
+
+ }
}