You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/31 21:11:35 UTC
[pulsar] branch master updated: When cursor recovery encounters
empty cursor ledger, fallback to latest snapshot (#3487)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7d9d879 When cursor recovery encounters empty cursor ledger, fallback to latest snapshot (#3487)
7d9d879 is described below
commit 7d9d879bedec749f0cb8836267e5fb1ec2d6fa52
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jan 31 13:11:29 2019 -0800
When cursor recovery encounters empty cursor ledger, fallback to latest snapshot (#3487)
* When cursor recovery encounters empty cursor ledger, fallback to latest snapshot
* Remove post-decrement comparison
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++++++-
.../bookkeeper/client/PulsarMockBookKeeper.java | 15 +++++++++++
.../bookkeeper/client/PulsarMockLedgerHandle.java | 6 ++++-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 31 ++++++++++++++++++++++
4 files changed, 61 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index ca5d2df..bcfb858 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -282,6 +282,15 @@ public class ManagedCursorImpl implements ManagedCursor {
// Read the last entry in the ledger
long lastEntryInLedger = lh.getLastAddConfirmed();
+
+ if (lastEntryInLedger < 0) {
+ log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
+ ledger.getName(), ledgerId, name);
+ // Rewind to last cursor snapshot available
+ initialize(getRollbackPosition(info), callback);
+ return;
+ }
+
lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
@@ -2059,7 +2068,7 @@ public class ManagedCursorImpl implements ManagedCursor {
});
}));
}, Collections.emptyMap());
-
+
}
private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index f3689e9..fa49e57 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -249,6 +249,12 @@ public class PulsarMockBookKeeper extends BookKeeper {
}
}
+ synchronized boolean checkReturnEmptyLedger() {
+ boolean shouldFailNow = (emptyLedgerAfter == 0);
+ --emptyLedgerAfter;
+ return shouldFailNow;
+ }
+
synchronized CompletableFuture<Void> getProgrammedFailure() {
return failures.isEmpty() ? defaultResponse : failures.remove(0);
}
@@ -261,6 +267,15 @@ public class PulsarMockBookKeeper extends BookKeeper {
promiseAfter(steps).completeExceptionally(BKException.create(rc));
}
+ private int emptyLedgerAfter = -1;
+
+ /**
+ * After N times, make a ledger to appear to be empty
+ */
+ public synchronized void returnEmptyLedgerAfter(int steps) {
+ emptyLedgerAfter = steps;
+ }
+
public synchronized CompletableFuture<Void> promiseAfter(int steps) {
while (failures.size() <= steps) {
failures.add(defaultResponse);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 2397a62..0c6535d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -191,7 +191,11 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
@Override
public long getLastAddConfirmed() {
- return lastEntry;
+ if (bk.checkReturnEmptyLedger()) {
+ return -1;
+ } else {
+ return lastEntry;
+ }
}
@Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 6d90016..5b64468 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -1091,6 +1091,37 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
}
+ @Test
+ void failDuringRecoveryWithEmptyLedger() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger");
+ ManagedCursor cursor = ledger.openCursor("cursor");
+
+ ledger.addEntry("entry-1".getBytes());
+ Position p2 = ledger.addEntry("entry-2".getBytes());
+ Position p3 = ledger.addEntry("entry-3".getBytes());
+
+ cursor.markDelete(p2);
+ // Do graceful close so snapshot is forced
+ ledger.close();
+
+ // Re-open
+ ledger = factory.open("my_test_ledger");
+ cursor = ledger.openCursor("cursor");
+ cursor.markDelete(p3);
+
+ // Force-reopen so the recovery will be forced to read from ledger
+ bkc.returnEmptyLedgerAfter(1);
+ ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
+ ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
+ ledger = factory2.open("my_test_ledger");
+ cursor = ledger.openCursor("cursor");
+
+ // Cursor was rolled back to p2 because of the ledger recovery failure
+ assertEquals(cursor.getMarkDeletedPosition(), p2);
+
+ factory2.shutdown();
+ }
+
@Test(timeOut = 20000)
void errorRecoveringCursor() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");