You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/03/25 01:58:46 UTC

[pulsar] branch branch-2.9 updated: [fix][txn]: fix pending ack is recovering throw CursorAlreadyClosedxception (#14781)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new cb4b004  [fix][txn]: fix pending ack is recovering throw CursorAlreadyClosedxception (#14781)
cb4b004 is described below

commit cb4b004f21b959f8e4a733e45c5972380738264c
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Mar 23 11:08:04 2022 +0800

    [fix][txn]: fix pending ack is recovering throw CursorAlreadyClosedxception (#14781)
    
    ### Motivation
    When Transaction PendingAck recover fail throw CursorAlreadyClosedException, we should stop the recover op. the cursor was been closed, the pendingAck was been closed, so we should stop the recover op, in order to release thread resources
    
    ```
    02:03:00.072 [pulsar-transaction-executor-4-1] ERROR org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore - MLPendingAckStore of topic [public/default/persistent/source-topic-partition-13-test__transaction_pending_ack] stat reply fail!
    org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
    ```
    
    ### Modifications
    When recover fail by CursorAlreadyClosedException, comeplete recover
    ### Verifying this change
    add test for it
    
    (cherry picked from commit 9f30ee909a683458cb0c0ba7b6cf0c5bd874f4ea)
---
 .../broker/transaction/pendingack/impl/MLPendingAckStore.java |  3 ++-
 .../org/apache/pulsar/broker/transaction/TransactionTest.java | 11 +++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index a7a46f9..0e4ed44 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -398,7 +398,8 @@ public class MLPendingAckStore implements PendingAckStore {
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
             if (managedLedger.getConfig().isAutoSkipNonRecoverableData()
                     && exception instanceof ManagedLedgerException.NonRecoverableLedgerException
-                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException
+                    || exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
                 isReadable = false;
             }
             log.error("MLPendingAckStore of topic [{}] stat reply fail!", managedLedger.getName(), exception);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index a5a39ab..308fa35 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -622,6 +622,17 @@ public class TransactionTest extends TransactionTestBase {
         PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription);
         Awaitility.await().untilAsserted(() ->
                 assertEquals(pendingAckHandle2.getStats().state, "Ready"));
+
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        PendingAckHandleImpl pendingAckHandle3 = new PendingAckHandleImpl(persistentSubscription);
+
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(pendingAckHandle3.getStats().state, "Ready"));
     }
 
     @Test