You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/09/17 04:22:09 UTC

[pulsar] branch branch-2.9 updated: [branch-2.9][fix][broker]Persist cursor info error when cursor close (#17604)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 4dc61755d0e [branch-2.9][fix][broker]Persist cursor info error when cursor close (#17604)
4dc61755d0e is described below

commit 4dc61755d0e8cce3e55b9afa6ec33b0241aef5f4
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sat Sep 17 12:22:03 2022 +0800

    [branch-2.9][fix][broker]Persist cursor info error when cursor close (#17604)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 18 +++++++++--
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 35 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 1b54bb16a3d..8deb70c6a19 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2455,8 +2455,22 @@ public class ManagedCursorImpl implements ManagedCursor {
             callback.closeComplete(ctx);
             return;
         }
-        persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, callback, ctx);
-        STATE_UPDATER.set(this, State.Closed);
+        persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties,
+                new AsyncCallbacks.CloseCallback(){
+
+                    @Override
+                    public void closeComplete(Object ctx) {
+                        STATE_UPDATER.set(ManagedCursorImpl.this, State.Closed);
+                        callback.closeComplete(ctx);
+                    }
+
+                    @Override
+                    public void closeFailed(ManagedLedgerException exception, Object ctx) {
+                        log.warn("[{}] [{}] persistent position failure when closing, the state will remain in"
+                                + " state-closing and will no longer work", ledger.getName(), name);
+                        callback.closeFailed(exception, ctx);
+                    }
+                }, ctx);
     }
 
     /**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index db351aa2a74..5f1ed11836b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -69,6 +69,7 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -102,6 +103,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -129,6 +131,39 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
     }
 
 
+    @Test
+    public void testCloseCursor() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxUnackedRangesToPersistInZk(0);
+        config.setThrottleMarkDelete(0);
+        ManagedLedger ledger = factory.open("my_test_ledger", config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+        // Write some data.
+        ledger.addEntry(new byte[]{1});
+        ledger.addEntry(new byte[]{2});
+        ledger.addEntry(new byte[]{3});
+        ledger.addEntry(new byte[]{4});
+        ledger.addEntry(new byte[]{5});
+        // Persistent cursor info to ledger.
+        c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId()));
+        Awaitility.await().until(() ->c1.getStats().getPersistLedgerSucceed() > 0);
+        // Make cursor ledger can not work.
+        closeCursorLedger(c1);
+        c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId() + 2));
+        ledger.close();
+    }
+
+    private static void closeCursorLedger(ManagedCursorImpl managedCursor) {
+        Awaitility.await().until(() -> {
+            LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger");
+            if (ledgerHandle == null) {
+                return false;
+            }
+            ledgerHandle.close();
+            return true;
+        });
+    }
+
     @Test(timeOut = 20000)
     void readFromEmptyLedger() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");