You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:34 UTC

[pulsar] 04/17: [optimize][txn] Optimize transaction lowWaterMark to clean useless data faster (#15592)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fa78cf8f45ba40f5e699f3510606db444846b323
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat May 21 17:15:56 2022 +0800

    [optimize][txn] Optimize transaction lowWaterMark to clean useless data faster (#15592)
    
    (cherry picked from commit fcf5e148b055d617db37eef3c40d4004e74190a5)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  72 ++++++-----
 .../pendingack/impl/PendingAckHandleImpl.java      |  49 ++++++--
 .../buffer/TransactionLowWaterMarkTest.java        | 140 ++++++++++++++++++++-
 .../pendingack/PendingAckPersistentTest.java       |  20 ++-
 4 files changed, 223 insertions(+), 58 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index c9cde544c80..6469ba9fb9f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.SneakyThrows;
@@ -96,8 +97,13 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     private final CompletableFuture<Void> transactionBufferFuture = new CompletableFuture<>();
 
+    /**
+     * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC.
+     */
     private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();
 
+    private final Semaphore handleLowWaterMark = new Semaphore(1);
+
     public TopicTransactionBuffer(PersistentTopic topic) {
         super(State.None);
         this.topic = topic;
@@ -285,13 +291,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     @Override
     public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
-        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
-            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
-                return lowWaterMark;
-            } else {
-                return oldLowWaterMark;
-            }
-        });
         if (log.isDebugEnabled()) {
             log.debug("Transaction {} commit on topic {}.", txnID.toString(), topic.getName());
         }
@@ -332,13 +331,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     @Override
     public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
-        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
-            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
-                return lowWaterMark;
-            } else {
-                return oldLowWaterMark;
-            }
-        });
         if (log.isDebugEnabled()) {
             log.debug("Transaction {} abort on topic {}.", txnID.toString(), topic.getName());
         }
@@ -358,12 +350,12 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                         synchronized (TopicTransactionBuffer.this) {
                             aborts.put(txnID, (PositionImpl) position);
                             updateMaxReadPosition(txnID);
-                            handleLowWaterMark(txnID, lowWaterMark);
                             changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
                             clearAbortedTransactions();
                             takeSnapshotByChangeTimes();
                         }
                         completableFuture.complete(null);
+                        handleLowWaterMark(txnID, lowWaterMark);
                     }
 
                     @Override
@@ -384,30 +376,36 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
     }
 
     private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
-        if (!ongoingTxns.isEmpty()) {
-            TxnID firstTxn = ongoingTxns.firstKey();
-            if (firstTxn.getMostSigBits() == txnID.getMostSigBits() && lowWaterMark >= firstTxn.getLeastSigBits()) {
-                ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
-                        firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());
-                try {
-                    topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
-                        @Override
-                        public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                            synchronized (TopicTransactionBuffer.this) {
-                                aborts.put(firstTxn, (PositionImpl) position);
-                                updateMaxReadPosition(firstTxn);
-                            }
-                        }
-
-                        @Override
-                        public void addFailed(ManagedLedgerException exception, Object ctx) {
-                            log.error("Failed to abort low water mark for txn {}", txnID, exception);
-                        }
-                    }, null);
-                } finally {
-                    abortMarker.release();
+        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+                return lowWaterMark;
+            } else {
+                return oldLowWaterMark;
+            }
+        });
+        if (handleLowWaterMark.tryAcquire()) {
+            if (!ongoingTxns.isEmpty()) {
+                TxnID firstTxn = ongoingTxns.firstKey();
+                long tCId = firstTxn.getMostSigBits();
+                Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+                if (lowWaterMarkOfFirstTxnId != null && firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+                    abortTxn(firstTxn, lowWaterMarkOfFirstTxnId)
+                            .thenRun(() -> {
+                                log.warn("Successes to abort low water mark for txn [{}], topic [{}],"
+                                        + " lowWaterMark [{}]", firstTxn, topic.getName(), lowWaterMarkOfFirstTxnId);
+                                handleLowWaterMark.release();
+                            })
+                            .exceptionally(ex -> {
+                                log.warn("Failed to abort low water mark for txn {}, topic [{}], "
+                                        + "lowWaterMark [{}], ", firstTxn, topic.getName(), lowWaterMarkOfFirstTxnId,
+                                        ex);
+                                handleLowWaterMark.release();
+                                return null;
+                            });
+                    return;
                 }
             }
+            handleLowWaterMark.release();
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 7e11592e063..323a1414e43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -29,10 +29,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -114,6 +116,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
 
     private final BlockingQueue<Runnable> acceptQueue = new LinkedBlockingDeque<>();
 
+    /**
+     * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC.
+     */
+    private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();
+
+    private final Semaphore handleLowWaterMark = new Semaphore(1);
+
     @Getter
     private final ExecutorService internalPinnedExecutor;
 
@@ -595,20 +604,34 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
     }
 
     private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
-        if (individualAckOfTransaction != null && !individualAckOfTransaction.isEmpty()) {
-            TxnID firstTxn = individualAckOfTransaction.firstKey();
-
-            if (firstTxn.getMostSigBits() == txnID.getMostSigBits()
-                    && firstTxn.getLeastSigBits() <= lowWaterMark) {
-                abortTxn(firstTxn, null, lowWaterMark).thenRun(() -> {
-                    log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
-                            + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
-                }).exceptionally(e -> {
-                    log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
-                            + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
-                    return null;
-                });
+        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+                return lowWaterMark;
+            } else {
+                return oldLowWaterMark;
+            }
+        });
+
+        if (handleLowWaterMark.tryAcquire()) {
+            if (individualAckOfTransaction != null && !individualAckOfTransaction.isEmpty()) {
+                TxnID firstTxn = individualAckOfTransaction.firstKey();
+                long tCId = firstTxn.getMostSigBits();
+                Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+                if (lowWaterMarkOfFirstTxnId != null && firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+                    abortTxn(firstTxn, null, lowWaterMarkOfFirstTxnId).thenRun(() -> {
+                        log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
+                                + "lowWaterMark : [{}]", topicName, firstTxn, lowWaterMarkOfFirstTxnId);
+                        handleLowWaterMark.release();
+                    }).exceptionally(ex -> {
+                        log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
+                                + "lowWaterMark : [{}]", topicName, firstTxn, lowWaterMarkOfFirstTxnId);
+                        handleLowWaterMark.release();
+                        return null;
+                    });
+                    return;
+                }
             }
+            handleLowWaterMark.release();
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index ba0659892b4..a1ff3e7d34a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -37,7 +37,9 @@ import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -71,7 +73,7 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class TransactionLowWaterMarkTest extends TransactionTestBase {
 
-    private static final String TOPIC = NAMESPACE1 + "/test-topic";
+    private static final String TOPIC = "persistent://" + NAMESPACE1 + "/test-topic";
 
     @BeforeMethod(alwaysRun = true)
     protected void setup() throws Exception {
@@ -216,7 +218,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
             ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
                     (ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) field
                             .get(getPulsarServiceList().get(i).getBrokerService());
-            CompletableFuture<Optional<Topic>> completableFuture = topics.get("persistent://" + TOPIC);
+            CompletableFuture<Optional<Topic>> completableFuture = topics.get(TOPIC);
             if (completableFuture != null) {
                 Optional<Topic> topic = completableFuture.get();
                 if (topic.isPresent()) {
@@ -327,4 +329,138 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
             // no-op
         }
     }
+
+    @Test
+    public void testLowWaterMarkForDifferentTC() throws Exception {
+        String subName = "sub";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(TOPIC)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(TOPIC)
+                .subscriptionName(subName)
+                .subscribe();
+
+        Transaction txn1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(500, TimeUnit.SECONDS)
+                .build().get();
+        Transaction txn2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(500, TimeUnit.SECONDS)
+                .build().get();
+        while (txn2.getTxnID().getMostSigBits() == txn1.getTxnID().getMostSigBits()) {
+            txn2 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(500, TimeUnit.SECONDS)
+                    .build().get();
+        }
+        Transaction txn3 = pulsarClient.newTransaction()
+                .withTransactionTimeout(500, TimeUnit.SECONDS)
+                .build().get();
+        while (txn3.getTxnID().getMostSigBits() != txn2.getTxnID().getMostSigBits()) {
+            txn3 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(500, TimeUnit.SECONDS)
+                    .build().get();
+        }
+
+        Transaction txn4 = pulsarClient.newTransaction()
+                .withTransactionTimeout(500, TimeUnit.SECONDS)
+                .build().get();
+        while (txn4.getTxnID().getMostSigBits() != txn1.getTxnID().getMostSigBits()) {
+            txn4 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(500, TimeUnit.SECONDS)
+                    .build().get();
+        }
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().send();
+        }
+
+        producer.newMessage(txn1).send();
+        producer.newMessage(txn2).send();
+        producer.newMessage(txn3).send();
+        producer.newMessage(txn4).send();
+
+        Message<byte[]> message1 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message1.getMessageId(), txn1);
+        Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message2.getMessageId(), txn2);
+        Message<byte[]> message3 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message3.getMessageId(), txn3);
+        Message<byte[]> message4 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message4.getMessageId(), txn4);
+
+        txn1.commit().get();
+        txn2.commit().get();
+
+        Field field = TransactionImpl.class.getDeclaredField("state");
+        field.setAccessible(true);
+        field.set(txn1, TransactionImpl.State.OPEN);
+        field.set(txn2, TransactionImpl.State.OPEN);
+
+        producer.newMessage(txn1).send();
+        producer.newMessage(txn2).send();
+
+        Message<byte[]> message5 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message5.getMessageId(), txn1);
+        Message<byte[]> message6 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message6.getMessageId(), txn2);
+
+        txn3.commit().get();
+        TxnID txnID1 = txn1.getTxnID();
+        TxnID txnID2 = txn2.getTxnID();
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(checkTxnIsOngoingInTP(txnID1, subName));
+            assertTrue(checkTxnIsOngoingInTP(txnID2, subName));
+            assertTrue(checkTxnIsOngoingInTB(txnID1));
+            assertTrue(checkTxnIsOngoingInTB(txnID2));
+        });
+
+        txn4.commit().get();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertFalse(checkTxnIsOngoingInTP(txnID1, subName));
+            assertFalse(checkTxnIsOngoingInTP(txnID2, subName));
+            assertFalse(checkTxnIsOngoingInTB(txnID1));
+            assertFalse(checkTxnIsOngoingInTB(txnID2));
+        });
+    }
+
+    private boolean checkTxnIsOngoingInTP(TxnID txnID, String subName) throws Exception {
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService()
+                .getTopic(TopicName.get(TOPIC).toString(), false)
+                .get().get();
+
+        PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
+
+        Field field1 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
+        field1.setAccessible(true);
+        PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) field1.get(persistentSubscription);
+
+        Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+        field2.setAccessible(true);
+        LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction =
+                (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle);
+        return individualAckOfTransaction.containsKey(txnID);
+    }
+
+    private boolean checkTxnIsOngoingInTB(TxnID txnID) throws Exception {
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService()
+                .getTopic(TopicName.get(TOPIC).toString(), false)
+                .get().get();
+
+        TopicTransactionBuffer topicTransactionBuffer =
+                (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        Field field3 = TopicTransactionBuffer.class.getDeclaredField("ongoingTxns");
+        field3.setAccessible(true);
+        LinkedMap<TxnID, PositionImpl> ongoingTxns =
+                (LinkedMap<TxnID, PositionImpl>) field3.get(topicTransactionBuffer);
+        return ongoingTxns.containsKey(txnID);
+
+    }
+
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 96b2b9c14bf..367c63797d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -542,24 +542,32 @@ public class PendingAckPersistentTest extends TransactionTestBase {
                         .get();
 
         PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
+        Field field1 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
+        field1.setAccessible(true);
+        PendingAckHandleImpl oldPendingAckHandle = (PendingAckHandleImpl) field1.get(persistentSubscription);
+        Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+        field2.setAccessible(true);
+        LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> oldIndividualAckOfTransaction =
+                (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(oldPendingAckHandle);
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(oldIndividualAckOfTransaction.size(), 0));
+
         PendingAckHandleImpl pendingAckHandle = new PendingAckHandleImpl(persistentSubscription);
 
         Method method = PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore");
         method.setAccessible(true);
         method.invoke(pendingAckHandle);
 
-        Field field1 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
-        field1.setAccessible(true);
-        CompletableFuture<PendingAckStore> completableFuture =
-                (CompletableFuture<PendingAckStore>) field1.get(pendingAckHandle);
+        Field field3 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
+        field3.setAccessible(true);
 
         Awaitility.await().until(() -> {
+            CompletableFuture<PendingAckStore> completableFuture =
+                    (CompletableFuture<PendingAckStore>) field3.get(pendingAckHandle);
             completableFuture.get();
             return true;
         });
 
-        Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
-        field2.setAccessible(true);
+
         LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction =
                 (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle);