You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/31 21:11:35 UTC

[pulsar] branch master updated: When cursor recovery encounters empty cursor ledger, fallback to latest snapshot (#3487)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d9d879  When cursor recovery encounters empty cursor ledger, fallback to latest snapshot (#3487)
7d9d879 is described below

commit 7d9d879bedec749f0cb8836267e5fb1ec2d6fa52
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jan 31 13:11:29 2019 -0800

    When cursor recovery encounters empty cursor ledger, fallback to latest snapshot (#3487)
    
    * When cursor recovery encounters empty cursor ledger, fallback to latest snapshot
    
    * Remove post-decrement comparison
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++++++-
 .../bookkeeper/client/PulsarMockBookKeeper.java    | 15 +++++++++++
 .../bookkeeper/client/PulsarMockLedgerHandle.java  |  6 ++++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 31 ++++++++++++++++++++++
 4 files changed, 61 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index ca5d2df..bcfb858 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -282,6 +282,15 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             // Read the last entry in the ledger
             long lastEntryInLedger = lh.getLastAddConfirmed();
+
+            if (lastEntryInLedger < 0) {
+                log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
+                        ledger.getName(), ledgerId, name);
+                // Rewind to last cursor snapshot available
+                initialize(getRollbackPosition(info), callback);
+                return;
+            }
+
             lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
@@ -2059,7 +2068,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 });
             }));
         }, Collections.emptyMap());
-       
+
     }
 
     private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index f3689e9..fa49e57 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -249,6 +249,12 @@ public class PulsarMockBookKeeper extends BookKeeper {
         }
     }
 
+    synchronized boolean checkReturnEmptyLedger() {
+        boolean shouldFailNow = (emptyLedgerAfter == 0);
+        --emptyLedgerAfter;
+        return shouldFailNow;
+    }
+
     synchronized CompletableFuture<Void> getProgrammedFailure() {
         return failures.isEmpty() ? defaultResponse : failures.remove(0);
     }
@@ -261,6 +267,15 @@ public class PulsarMockBookKeeper extends BookKeeper {
         promiseAfter(steps).completeExceptionally(BKException.create(rc));
     }
 
+    private int emptyLedgerAfter = -1;
+
+    /**
+     * After N times, make a ledger to appear to be empty
+     */
+    public synchronized void returnEmptyLedgerAfter(int steps) {
+        emptyLedgerAfter = steps;
+    }
+
     public synchronized CompletableFuture<Void> promiseAfter(int steps) {
         while (failures.size() <= steps) {
             failures.add(defaultResponse);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 2397a62..0c6535d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -191,7 +191,11 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
 
     @Override
     public long getLastAddConfirmed() {
-        return lastEntry;
+        if (bk.checkReturnEmptyLedger()) {
+            return -1;
+        } else {
+            return lastEntry;
+        }
     }
 
     @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 6d90016..5b64468 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -1091,6 +1091,37 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         }
     }
 
+    @Test
+    void failDuringRecoveryWithEmptyLedger() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger");
+        ManagedCursor cursor = ledger.openCursor("cursor");
+
+        ledger.addEntry("entry-1".getBytes());
+        Position p2 = ledger.addEntry("entry-2".getBytes());
+        Position p3 = ledger.addEntry("entry-3".getBytes());
+
+        cursor.markDelete(p2);
+        // Do graceful close so snapshot is forced
+        ledger.close();
+
+        // Re-open
+        ledger = factory.open("my_test_ledger");
+        cursor = ledger.openCursor("cursor");
+        cursor.markDelete(p3);
+
+        // Force-reopen so the recovery will be forced to read from ledger
+        bkc.returnEmptyLedgerAfter(1);
+        ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
+        ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
+        ledger = factory2.open("my_test_ledger");
+        cursor = ledger.openCursor("cursor");
+
+        // Cursor was rolled back to p2 because of the ledger recovery failure
+        assertEquals(cursor.getMarkDeletedPosition(), p2);
+
+        factory2.shutdown();
+    }
+
     @Test(timeOut = 20000)
     void errorRecoveringCursor() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");