You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2022/09/15 08:46:13 UTC
[pulsar] 05/09: [fix][broker]Persist cursor info error when cursor close (#17255)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit abe1f28b8e1688420f8923890cea2f8d15c1ec69
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Aug 25 09:36:51 2022 +0800
[fix][broker]Persist cursor info error when cursor close (#17255)
(cherry picked from commit 46b1239a23fd2a250b009df1e9b4a65341d299a7)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 18 ++++++++++--
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 34 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d934d0290a8..2d2597e15e2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2458,8 +2458,22 @@ public class ManagedCursorImpl implements ManagedCursor {
callback.closeComplete(ctx);
return;
}
- persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, callback, ctx);
- STATE_UPDATER.set(this, State.Closed);
+ persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties,
+ new AsyncCallbacks.CloseCallback(){
+
+ @Override
+ public void closeComplete(Object ctx) {
+ STATE_UPDATER.set(ManagedCursorImpl.this, State.Closed);
+ callback.closeComplete(ctx);
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn("[{}] [{}] persistent position failure when closing, the state will remain in"
+ + " state-closing and will no longer work", ledger.getName(), name);
+ callback.closeFailed(exception, ctx);
+ }
+ }, ctx);
}
/**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 864b34d19e8..72573264d04 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -71,6 +71,7 @@ import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -105,6 +106,7 @@ import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -131,6 +133,38 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
+ @Test
+ public void testCloseCursor() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setThrottleMarkDelete(0);
+ ManagedLedger ledger = factory.open("my_test_ledger", config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+ // Write some data.
+ ledger.addEntry(new byte[]{1});
+ ledger.addEntry(new byte[]{2});
+ ledger.addEntry(new byte[]{3});
+ ledger.addEntry(new byte[]{4});
+ ledger.addEntry(new byte[]{5});
+ // Persistent cursor info to ledger.
+ c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId()));
+ Awaitility.await().until(() ->c1.getStats().getPersistLedgerSucceed() > 0);
+ // Make cursor ledger can not work.
+ closeCursorLedger(c1);
+ c1.delete(PositionImpl.get(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId() + 2));
+ ledger.close();
+ }
+
+ private static void closeCursorLedger(ManagedCursorImpl managedCursor) {
+ Awaitility.await().until(() -> {
+ LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger");
+ if (ledgerHandle == null) {
+ return false;
+ }
+ ledgerHandle.close();
+ return true;
+ });
+ }
+
@Test(timeOut = 20000)
void readFromEmptyLedger() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");