You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/25 01:36:58 UTC

[pulsar] branch master updated: [fix][broker]Persist cursor info error when cursor close (#17255)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 46b1239a23f [fix][broker]Persist cursor info error when cursor close (#17255)
46b1239a23f is described below

commit 46b1239a23fd2a250b009df1e9b4a65341d299a7
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Aug 25 09:36:51 2022 +0800

    [fix][broker]Persist cursor info error when cursor close (#17255)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 18 +++++++++--
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 35 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 72b8dc5a8e2..de85ac92ee8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2545,8 +2545,22 @@ public class ManagedCursorImpl implements ManagedCursor {
             callback.closeComplete(ctx);
             return;
         }
-        persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, callback, ctx);
-        STATE_UPDATER.set(this, State.Closed);
+        persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties,
+                new AsyncCallbacks.CloseCallback(){
+
+                    @Override
+                    public void closeComplete(Object ctx) {
+                        STATE_UPDATER.set(ManagedCursorImpl.this, State.Closed);
+                        callback.closeComplete(ctx);
+                    }
+
+                    @Override
+                    public void closeFailed(ManagedLedgerException exception, Object ctx) {
+                        log.warn("[{}] [{}] persistent position failure when closing, the state will remain in"
+                                + " state-closing and will no longer work", ledger.getName(), name);
+                        callback.closeFailed(exception, ctx);
+                    }
+                }, ctx);
     }
 
     /**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 09fa49dadd9..052f6ac2d54 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -70,6 +70,7 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -103,6 +104,7 @@ import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -119,6 +121,39 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
     }
 
 
+    @Test
+    public void testCloseCursor() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxUnackedRangesToPersistInMetadataStore(0);
+        config.setThrottleMarkDelete(0);
+        ManagedLedger ledger = factory.open("my_test_ledger", config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+        // Write some data.
+        ledger.addEntry(new byte[]{1});
+        ledger.addEntry(new byte[]{2});
+        ledger.addEntry(new byte[]{3});
+        ledger.addEntry(new byte[]{4});
+        ledger.addEntry(new byte[]{5});
+        // Persistent cursor info to ledger.
+        c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId()));
+        Awaitility.await().until(() ->c1.getStats().getPersistLedgerSucceed() > 0);
+        // Make cursor ledger can not work.
+        closeCursorLedger(c1);
+        c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId() + 2));
+        ledger.close();
+    }
+
+    private static void closeCursorLedger(ManagedCursorImpl managedCursor) {
+        Awaitility.await().until(() -> {
+            LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger");
+            if (ledgerHandle == null) {
+                return false;
+            }
+            ledgerHandle.close();
+            return true;
+        });
+    }
+
     @Test(timeOut = 20000)
     void readFromEmptyLedger() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");