You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/25 06:21:38 UTC

[pulsar] 02/02: [fix][txn]: fix transaction buffer recover throw cursor already close (#14807)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f0a2171cbad894cec5bfb2d4de31cb8de32a3183
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Mar 25 14:16:29 2022 +0800

    [fix][txn]: fix transaction buffer recover throw cursor already close (#14807)
    
    ### Motivation
    When Transaction buffer recover fail throw CursorAlreadyClosedException, we should stop the recover op. the cursor was been closed, the transaction buffer was been closed, so we should stop the recover op, in order to release thread resources
    like https://github.com/apache/pulsar/pull/14781
    
    (cherry picked from commit aef5f6d5e2d44f84c9c358f1d9dd9db1108a9d99)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java      |  3 ++-
 .../apache/pulsar/broker/transaction/TransactionTest.java    | 12 ++++++++++++
 .../transaction/coordinator/impl/MLTransactionLogImpl.java   |  2 ++
 3 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 9ea3b42..66ce8f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -734,7 +734,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
             if (recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
                     && exception instanceof ManagedLedgerException.NonRecoverableLedgerException
-                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException
+                    || exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
                 isReadable = false;
             } else {
                 outstandingReadsRequests.decrementAndGet();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 6fd1e92..231c183 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -568,6 +568,18 @@ public class TransactionTest extends TransactionTestBase {
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
                 assertEquals(buffer2.getStats().state, "Ready"));
         managedCursors.removeCursor("transaction-buffer-sub");
+
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        managedCursors.add(managedCursor);
+        TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(buffer3.getStats().state, "Ready"));
+        managedCursors.removeCursor("transaction-buffer-sub");
     }
 
     @Test
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 2a13f95..f14784e 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -274,6 +274,8 @@ public class MLTransactionLogImpl implements TransactionLog {
                     || exception instanceof ManagedLedgerException.ManagedLedgerFencedException
                     || exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
                 isReadable = false;
+            } else {
+                outstandingReadsRequests.decrementAndGet();
             }
             log.error("Transaction log init fail error!", exception);
         }