You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/02/14 11:56:11 UTC

[pulsar] branch branch-2.9 updated: If mark-delete operation fails, mark the cursor as "dirty" (#14256)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new d692f8b  If mark-delete operation fails, mark the cursor as "dirty" (#14256)
d692f8b is described below

commit d692f8b25e3c943f06481b5bda1f337f4148c1b4
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Feb 13 03:05:52 2022 -0800

    If mark-delete operation fails, mark the cursor as "dirty" (#14256)
    
    (cherry picked from commit 8928c3496a61c588b50461d6adaab089dd421619)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  1 +
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 51 ++++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d37ecda..8c345026 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1835,6 +1835,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             @Override
             public void operationFailed(ManagedLedgerException exception) {
+                isDirty = true;
                 log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(),
                         ManagedCursorImpl.this, mdEntry.newPosition);
                 if (log.isDebugEnabled()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a00a817..676e92f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3508,6 +3508,57 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
                 });
     }
 
+
+
+    @Test
+    public void testFlushCursorAfterError() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setThrottleMarkDelete(1.0);
+
+        ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig();
+        factoryConfig.setCursorPositionFlushSeconds(1);
+
+        @Cleanup("shutdown")
+        ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig);
+        ManagedLedger ledger1 = factory1.open("testFlushCursorAfterInactivity", config);
+        ManagedCursor c1 = ledger1.openCursor("c");
+        List<Position> positions = new ArrayList<>();
+
+        for (int i = 0; i < 20; i++) {
+            positions.add(ledger1.addEntry(new byte[1024]));
+        }
+
+        // Simulate BK write error
+        bkc.failNow(BKException.Code.NotEnoughBookiesException);
+        metadataStore.setAlwaysFail(new MetadataStoreException.BadVersionException(""));
+
+        try {
+            c1.markDelete(positions.get(positions.size() - 1));
+            fail("should have failed");
+        } catch (ManagedLedgerException e) {
+            // Expected
+        }
+
+        metadataStore.unsetAlwaysFail();
+
+        // In memory position is updated
+        assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+
+        Awaitility.await()
+                // Give chance to the flush to be automatically triggered.
+                // NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting
+                .pollDelay(Duration.ofMillis(2000))
+                .untilAsserted(() -> {
+                    // Abruptly re-open the managed ledger without graceful close
+                    @Cleanup("shutdown")
+                    ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+                    ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
+                    ManagedCursor c2 = ledger2.openCursor("c");
+
+                    assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+                });
+    }
+
     @Test
     public void testConsistencyOfIndividualMessages() throws Exception {
         ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages");