You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/09/17 04:22:09 UTC
[pulsar] branch branch-2.9 updated: [branch-2.9][fix][broker]Persist cursor info error when cursor close (#17604)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 4dc61755d0e [branch-2.9][fix][broker]Persist cursor info error when cursor close (#17604)
4dc61755d0e is described below
commit 4dc61755d0e8cce3e55b9afa6ec33b0241aef5f4
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sat Sep 17 12:22:03 2022 +0800
[branch-2.9][fix][broker]Persist cursor info error when cursor close (#17604)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 18 +++++++++--
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 35 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 1b54bb16a3d..8deb70c6a19 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2455,8 +2455,22 @@ public class ManagedCursorImpl implements ManagedCursor {
callback.closeComplete(ctx);
return;
}
- persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, callback, ctx);
- STATE_UPDATER.set(this, State.Closed);
+ persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties,
+ new AsyncCallbacks.CloseCallback(){
+
+ @Override
+ public void closeComplete(Object ctx) {
+ STATE_UPDATER.set(ManagedCursorImpl.this, State.Closed);
+ callback.closeComplete(ctx);
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn("[{}] [{}] persistent position failure when closing, the state will remain in"
+ + " state-closing and will no longer work", ledger.getName(), name);
+ callback.closeFailed(exception, ctx);
+ }
+ }, ctx);
}
/**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index db351aa2a74..5f1ed11836b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -69,6 +69,7 @@ import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -102,6 +103,7 @@ import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -129,6 +131,39 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
+ @Test
+ public void testCloseCursor() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxUnackedRangesToPersistInZk(0);
+ config.setThrottleMarkDelete(0);
+ ManagedLedger ledger = factory.open("my_test_ledger", config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+ // Write some data.
+ ledger.addEntry(new byte[]{1});
+ ledger.addEntry(new byte[]{2});
+ ledger.addEntry(new byte[]{3});
+ ledger.addEntry(new byte[]{4});
+ ledger.addEntry(new byte[]{5});
+ // Persistent cursor info to ledger.
+ c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId()));
+ Awaitility.await().until(() ->c1.getStats().getPersistLedgerSucceed() > 0);
+ // Make cursor ledger can not work.
+ closeCursorLedger(c1);
+ c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId() + 2));
+ ledger.close();
+ }
+
+ private static void closeCursorLedger(ManagedCursorImpl managedCursor) {
+ Awaitility.await().until(() -> {
+ LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger");
+ if (ledgerHandle == null) {
+ return false;
+ }
+ ledgerHandle.close();
+ return true;
+ });
+ }
+
@Test(timeOut = 20000)
void readFromEmptyLedger() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");