You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/03/24 05:21:05 UTC

[pulsar] branch branch-2.9 updated: [fix][txn]: fix transaction log recover throw cursor already close (#14810)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 199b2d2  [fix][txn]: fix transaction log recover throw cursor already close (#14810)
199b2d2 is described below

commit 199b2d22e5e9b495e7ba2579a7ec3bf3d475e387
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Mar 24 13:08:36 2022 +0800

    [fix][txn]: fix transaction log recover throw cursor already close (#14810)
    
    ### Motivation
    When Transactionlog recover fail throw CursorAlreadyClosedException, we should stop the recover op. the cursor was been closed, the transaction log was been closed, so we should stop the recover op, in order to release thread resources
    like https://github.com/apache/pulsar/pull/14781
    
    ### Modifications
    When recover fail by CursorAlreadyClosedException, comeplete recover
    
    (cherry picked from commit a14a97e9aea230adf0ce85224d8d47a1c5e0fb22)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java     |  3 ++-
 .../apache/pulsar/broker/transaction/TransactionTest.java   | 13 +++++++++++++
 .../transaction/coordinator/impl/MLTransactionLogImpl.java  |  4 ++--
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 3b28966d..f108a16 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -710,9 +710,10 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                     && exception instanceof ManagedLedgerException.NonRecoverableLedgerException
                     || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
                 isReadable = false;
+            } else {
+                outstandingReadsRequests.decrementAndGet();
             }
             recover.callBackException(exception);
-            outstandingReadsRequests.decrementAndGet();
         }
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 2b1a2f4..a5a39ab 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -682,6 +682,19 @@ public class TransactionTest extends TransactionTestBase {
                         mlTransactionSequenceIdGenerator);
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
+
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        MLTransactionMetadataStore metadataStore3 =
+                new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
+                        mlTransactionLog, timeoutTracker, transactionRecoverTracker,
+                        mlTransactionSequenceIdGenerator);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));
     }
 
     @Test
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 9ab4bd3..2a13f95 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -271,11 +271,11 @@ public class MLTransactionLogImpl implements TransactionLog {
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
             if (managedLedgerConfig.isAutoSkipNonRecoverableData()
                     && exception instanceof ManagedLedgerException.NonRecoverableLedgerException
-                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException
+                    || exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
                 isReadable = false;
             }
             log.error("Transaction log init fail error!", exception);
-            outstandingReadsRequests.decrementAndGet();
         }
 
     }