You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/02/14 11:56:11 UTC
[pulsar] branch branch-2.9 updated: If mark-delete operation fails, mark the cursor as "dirty" (#14256)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new d692f8b If mark-delete operation fails, mark the cursor as "dirty" (#14256)
d692f8b is described below
commit d692f8b25e3c943f06481b5bda1f337f4148c1b4
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Feb 13 03:05:52 2022 -0800
If mark-delete operation fails, mark the cursor as "dirty" (#14256)
(cherry picked from commit 8928c3496a61c588b50461d6adaab089dd421619)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 1 +
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 51 ++++++++++++++++++++++
2 files changed, 52 insertions(+)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d37ecda..8c345026 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1835,6 +1835,7 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void operationFailed(ManagedLedgerException exception) {
+ isDirty = true;
log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(),
ManagedCursorImpl.this, mdEntry.newPosition);
if (log.isDebugEnabled()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a00a817..676e92f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3508,6 +3508,57 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
});
}
+
+
+ @Test
+ public void testFlushCursorAfterError() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setThrottleMarkDelete(1.0);
+
+ ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig();
+ factoryConfig.setCursorPositionFlushSeconds(1);
+
+ @Cleanup("shutdown")
+ ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig);
+ ManagedLedger ledger1 = factory1.open("testFlushCursorAfterInactivity", config);
+ ManagedCursor c1 = ledger1.openCursor("c");
+ List<Position> positions = new ArrayList<>();
+
+ for (int i = 0; i < 20; i++) {
+ positions.add(ledger1.addEntry(new byte[1024]));
+ }
+
+ // Simulate BK write error
+ bkc.failNow(BKException.Code.NotEnoughBookiesException);
+ metadataStore.setAlwaysFail(new MetadataStoreException.BadVersionException(""));
+
+ try {
+ c1.markDelete(positions.get(positions.size() - 1));
+ fail("should have failed");
+ } catch (ManagedLedgerException e) {
+ // Expected
+ }
+
+ metadataStore.unsetAlwaysFail();
+
+ // In memory position is updated
+ assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+
+ Awaitility.await()
+ // Give chance to the flush to be automatically triggered.
+ // NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting
+ .pollDelay(Duration.ofMillis(2000))
+ .untilAsserted(() -> {
+ // Abruptly re-open the managed ledger without graceful close
+ @Cleanup("shutdown")
+ ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
+ ManagedCursor c2 = ledger2.openCursor("c");
+
+ assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+ });
+ }
+
@Test
public void testConsistencyOfIndividualMessages() throws Exception {
ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages");