You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/05/13 05:48:48 UTC

[pulsar] branch master updated: [Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 498cde9ad3d [Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)
498cde9ad3d is described below

commit 498cde9ad3dd62142d73e024ea424bd76726dfaa
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri May 13 13:48:34 2022 +0800

    [Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)
    
    ### Motivation
    Now, PendingAckHandle use the ending transaction ID to append abort mark, but it is wrong. We should append abort mark for the first transaction in the  individualAckOfTransaction after judgment.
    ### Modification
    Append abort mark for the first transaction in the  individualAckOfTransaction after judgment.
---
 .../pendingack/impl/PendingAckHandleImpl.java      |  22 ++--
 .../pendingack/PendingAckPersistentTest.java       | 117 +++++++++++++++++++++
 2 files changed, 124 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index dc4f89d5e11..98b6700ca20 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -600,21 +600,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
 
             if (firstTxn.getMostSigBits() == txnID.getMostSigBits()
                     && firstTxn.getLeastSigBits() <= lowWaterMark) {
-                this.pendingAckStoreFuture.whenComplete((pendingAckStore, throwable) -> {
-                    if (throwable == null) {
-                        pendingAckStore.appendAbortMark(txnID, AckType.Individual).thenAccept(v -> {
-                            synchronized (PendingAckHandleImpl.this) {
-                                log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
-                                        + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
-                                individualAckOfTransaction.remove(firstTxn);
-                                handleLowWaterMark(txnID, lowWaterMark);
-                            }
-                        }).exceptionally(e -> {
-                            log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
-                                    + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
-                            return null;
-                        });
-                    }
+                abortTxn(firstTxn, null, lowWaterMark).thenRun(() -> {
+                    log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
+                            + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
+                }).exceptionally(e -> {
+                    log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
+                            + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
+                    return null;
                 });
             }
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 58408cdee10..7bc562ca62b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -23,7 +23,9 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -32,6 +34,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -44,6 +47,8 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -345,4 +350,116 @@ public class PendingAckPersistentTest extends TransactionTestBase {
         assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName2)));
         assertFalse(topics.contains(topic));
     }
+
+    @Test
+    public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception {
+        String topic = TopicName.get(TopicDomain.persistent.toString(),
+                NamespaceName.get(NAMESPACE1), "test").toString();
+
+        String subName = "subName";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Failover)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().send();
+        }
+
+        Transaction transaction1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        Message<byte[]> message1 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message1.getMessageId(), transaction1);
+        transaction1.commit().get();
+
+
+        Transaction transaction2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+        while (transaction1.getTxnID().getMostSigBits() != transaction2.getTxnID().getMostSigBits()) {
+            transaction2 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build()
+                    .get();
+        }
+
+        Transaction transaction3 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+        while (transaction1.getTxnID().getMostSigBits() != transaction3.getTxnID().getMostSigBits()) {
+            transaction3 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build()
+                    .get();
+        }
+
+        Message<byte[]> message3 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message3.getMessageId(), transaction2);
+        transaction2.commit().get();
+
+        Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
+
+        Field field = TransactionImpl.class.getDeclaredField("state");
+        field.setAccessible(true);
+        field.set(transaction1, TransactionImpl.State.OPEN);
+
+        consumer.acknowledgeAsync(message2.getMessageId(), transaction1).get();
+        Message<byte[]> message4 = consumer.receive(5, TimeUnit.SECONDS);
+        field.set(transaction2, TransactionImpl.State.OPEN);
+        consumer.acknowledgeAsync(message4.getMessageId(), transaction2).get();
+
+        Message<byte[]> message5 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message5.getMessageId(), transaction3);
+        transaction3.commit().get();
+
+
+        PersistentTopic persistentTopic =
+                (PersistentTopic) getPulsarServiceList()
+                        .get(0)
+                        .getBrokerService()
+                        .getTopic(topic, false)
+                        .get()
+                        .get();
+
+        PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
+        PendingAckHandleImpl pendingAckHandle = new PendingAckHandleImpl(persistentSubscription);
+
+        Method method = PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore");
+        method.setAccessible(true);
+        method.invoke(pendingAckHandle);
+
+        Field field1 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
+        field1.setAccessible(true);
+        CompletableFuture<PendingAckStore> completableFuture =
+                (CompletableFuture<PendingAckStore>) field1.get(pendingAckHandle);
+
+        Awaitility.await().until(() -> {
+            completableFuture.get();
+            return true;
+        });
+
+        Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+        field2.setAccessible(true);
+        LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction =
+                (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle);
+
+        assertFalse(individualAckOfTransaction.containsKey(transaction1.getTxnID()));
+        assertFalse(individualAckOfTransaction.containsKey(transaction2.getTxnID()));
+
+    }
 }