You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/25 01:36:58 UTC
[pulsar] branch master updated: [fix][broker]Persist cursor info error when cursor close (#17255)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 46b1239a23f [fix][broker]Persist cursor info error when cursor close (#17255)
46b1239a23f is described below
commit 46b1239a23fd2a250b009df1e9b4a65341d299a7
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Aug 25 09:36:51 2022 +0800
[fix][broker]Persist cursor info error when cursor close (#17255)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 18 +++++++++--
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 35 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 72b8dc5a8e2..de85ac92ee8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2545,8 +2545,22 @@ public class ManagedCursorImpl implements ManagedCursor {
callback.closeComplete(ctx);
return;
}
- persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, callback, ctx);
- STATE_UPDATER.set(this, State.Closed);
+ persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties,
+ new AsyncCallbacks.CloseCallback(){
+
+ @Override
+ public void closeComplete(Object ctx) {
+ STATE_UPDATER.set(ManagedCursorImpl.this, State.Closed);
+ callback.closeComplete(ctx);
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn("[{}] [{}] persistent position failure when closing, the state will remain in"
+ + " state-closing and will no longer work", ledger.getName(), name);
+ callback.closeFailed(exception, ctx);
+ }
+ }, ctx);
}
/**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 09fa49dadd9..052f6ac2d54 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -70,6 +70,7 @@ import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -103,6 +104,7 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -119,6 +121,39 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
+ @Test
+ public void testCloseCursor() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxUnackedRangesToPersistInMetadataStore(0);
+ config.setThrottleMarkDelete(0);
+ ManagedLedger ledger = factory.open("my_test_ledger", config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+ // Write some data.
+ ledger.addEntry(new byte[]{1});
+ ledger.addEntry(new byte[]{2});
+ ledger.addEntry(new byte[]{3});
+ ledger.addEntry(new byte[]{4});
+ ledger.addEntry(new byte[]{5});
+ // Persistent cursor info to ledger.
+ c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId()));
+ Awaitility.await().until(() ->c1.getStats().getPersistLedgerSucceed() > 0);
+ // Make cursor ledger can not work.
+ closeCursorLedger(c1);
+ c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId() + 2));
+ ledger.close();
+ }
+
+ private static void closeCursorLedger(ManagedCursorImpl managedCursor) {
+ Awaitility.await().until(() -> {
+ LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger");
+ if (ledgerHandle == null) {
+ return false;
+ }
+ ledgerHandle.close();
+ return true;
+ });
+ }
+
@Test(timeOut = 20000)
void readFromEmptyLedger() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");