You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/08/31 18:37:09 UTC
[pulsar] branch master updated: [improve] [ml] Persist mark deleted ops to ZK if create cursor ledger was failed (#20935)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 843b8307f44 [improve] [ml] Persist mark deleted ops to ZK if create cursor ledger was failed (#20935)
843b8307f44 is described below
commit 843b8307f44cd5e3a2d59ab93cc6b766f0c4ce0f
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Sep 1 02:37:00 2023 +0800
[improve] [ml] Persist mark deleted ops to ZK if create cursor ledger was failed (#20935)
The progress Persist mark deleted position is like this:
- persist to BK
- If failed to persist to BK, try to persist to ZK
But in the current implementation: if the cursor ledger was created failed, Pulsar will not try to persist to ZK. It makes if the cursor ledger created fails, a lot of ack records can not be persisted, and we will get a lot of repeat consumption after the BK recover.
Modifications: Try to persist the mark deleted position to ZK if the cursor ledger was created failed
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 77 ++++++++++------
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 28 ++++++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 102 +++++++++++++++++++++
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 64 +++++++++++++
4 files changed, 241 insertions(+), 30 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8ce3a322c09..e2b202cce15 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2099,7 +2099,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
});
- persistPositionToLedger(cursorLedger, mdEntry, new VoidCallback() {
+ VoidCallback cb = new VoidCallback() {
@Override
public void operationComplete() {
if (log.isDebugEnabled()) {
@@ -2151,7 +2151,18 @@ public class ManagedCursorImpl implements ManagedCursor {
mdEntry.triggerFailed(exception);
}
- });
+ };
+
+ if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
+ if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
+ persistPositionToMetaStore(mdEntry, cb);
+ } else {
+ mdEntry.callback.markDeleteFailed(new ManagedLedgerException("Create new cursor ledger failed"),
+ mdEntry.ctx);
+ }
+ } else {
+ persistPositionToLedger(cursorLedger, mdEntry, cb);
+ }
}
@Override
@@ -2797,16 +2808,15 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void operationFailed(ManagedLedgerException exception) {
- log.error("[{}][{}] Metadata ledger creation failed", ledger.getName(), name, exception);
+ log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata"
+ + " store.", ledger.getName(), name, exception);
synchronized (pendingMarkDeleteOps) {
- while (!pendingMarkDeleteOps.isEmpty()) {
- MarkDeleteEntry entry = pendingMarkDeleteOps.poll();
- entry.callback.markDeleteFailed(exception, entry.ctx);
- }
-
// At this point we don't have a ledger ready
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
+ // Note: if the stat is NoLedger, will persist the mark deleted position to metadata store.
+ // Before giving up, try to persist the position in the metadata store.
+ flushPendingMarkDeletes();
}
}
});
@@ -3073,32 +3083,39 @@ public class ManagedCursorImpl implements ManagedCursor {
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
- mbean.persistToLedger(false);
- // Before giving up, try to persist the position in the metadata store
- persistPositionMetaStore(-1, position, mdEntry.properties, new MetaStoreCallback<Void>() {
- @Override
- public void operationComplete(Void result, Stat stat) {
- if (log.isDebugEnabled()) {
- log.debug(
- "[{}][{}] Updated cursor in meta store after previous failure in ledger at position"
- + " {}", ledger.getName(), name, position);
- }
- mbean.persistToZookeeper(true);
- callback.operationComplete();
- }
-
- @Override
- public void operationFailed(MetaStoreException e) {
- log.warn("[{}][{}] Failed to update cursor in meta store after previous failure in ledger: {}",
- ledger.getName(), name, e.getMessage());
- mbean.persistToZookeeper(false);
- callback.operationFailed(createManagedLedgerException(rc));
- }
- }, true);
+ // Before giving up, try to persist the position in the metadata store.
+ persistPositionToMetaStore(mdEntry, callback);
}
}, null);
}
+ void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final VoidCallback callback) {
+ final PositionImpl newPosition = mdEntry.newPosition;
+ STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
+ mbean.persistToLedger(false);
+ // Before giving up, try to persist the position in the metadata store
+ persistPositionMetaStore(-1, newPosition, mdEntry.properties, new MetaStoreCallback<Void>() {
+ @Override
+ public void operationComplete(Void result, Stat stat) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "[{}][{}] Updated cursor in meta store after previous failure in ledger at position"
+ + " {}", ledger.getName(), name, newPosition);
+ }
+ mbean.persistToZookeeper(true);
+ callback.operationComplete();
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ log.warn("[{}][{}] Failed to update cursor in meta store after previous failure in ledger: {}",
+ ledger.getName(), name, e.getMessage());
+ mbean.persistToZookeeper(false);
+ callback.operationFailed(createManagedLedgerException(e));
+ }
+ }, true);
+ }
+
boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
if (ledger.getFactory().isMetadataServiceAvailable()
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c31a0c38cd3..89c18f4b834 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3552,6 +3552,34 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return positionToReturn;
}
+ public boolean isNoMessagesAfterPos(PositionImpl pos) {
+ PositionImpl lac = (PositionImpl) getLastConfirmedEntry();
+ return isNoMessagesAfterPosForSpecifiedLac(lac, pos);
+ }
+
+ private boolean isNoMessagesAfterPosForSpecifiedLac(PositionImpl specifiedLac, PositionImpl pos) {
+ if (pos.compareTo(specifiedLac) >= 0) {
+ return true;
+ }
+ if (specifiedLac.getEntryId() < 0) {
+ // Calculate the meaningful LAC.
+ PositionImpl actLac = getPreviousPosition(specifiedLac);
+ if (actLac.getEntryId() >= 0) {
+ return pos.compareTo(actLac) >= 0;
+ } else {
+ // If the actual LAC is still not meaningful.
+ if (actLac.equals(specifiedLac)) {
+ // No entries in maneged ledger.
+ return true;
+ } else {
+ // Continue to find a valid LAC.
+ return isNoMessagesAfterPosForSpecifiedLac(actLac, pos);
+ }
+ }
+ }
+ return false;
+ }
+
/**
* Get the entry position that come before the specified position in the message stream, using information from the
* ledger list and each ledger entries count.
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1b1b5534256..627ae73d928 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -229,6 +229,97 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
entries.forEach(Entry::release);
}
+ @Test
+ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
+ final int entryCount = 10;
+ final String cursorName = "c1";
+ final String mlName = "ml_test";
+ final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
+
+ ManagedCursor cursor = ml.openCursor("c1");
+ Position lastEntry = null;
+ for (int i = 0; i < 10; i++) {
+ lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
+ }
+
+ // Mock cursor ledger create failed.
+ bkc.failNow(BKException.Code.NoBookieAvailableException);
+
+ cursor.markDelete(lastEntry);
+
+ // Assert persist mark deleted position to ZK was successful.
+ PositionImpl slowestReadPosition = ml.getCursors().getSlowestReaderPosition();
+ assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId());
+ assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
+ assertEquals(cursor.getStats().getPersistLedgerSucceed(), 0);
+ assertTrue(cursor.getStats().getPersistZookeeperSucceed() > 0);
+ assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
+
+ // Verify the mark delete position can be recovered properly.
+ ml.close();
+ ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
+ ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
+ assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
+
+ // cleanup.
+ ml.delete();
+ }
+
+ @Test
+ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception {
+ final int entryCount = 10;
+ final String cursorName = "c1";
+ final String mlName = "ml_test";
+ final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
+
+ final ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor(cursorName);
+ ArrayList<Position> positions = new ArrayList<>();
+ for (int i = 0; i < entryCount; i++) {
+ positions.add(ml.addEntry(("entry-" + i).getBytes(Encoding)));
+ }
+ // Trigger the cursor ledger creating.
+ cursor.markDelete(positions.get(0));
+ assertTrue(cursor.getStats().getPersistLedgerSucceed() > 0);
+
+ // Mock cursor ledger write failed.
+ bkc.addEntryFailAfter(0, BKException.Code.NoBookieAvailableException);
+ // Trigger a failed writing of the cursor ledger, then wait the stat of cursor to be "NoLedger".
+ // This time ZK will be written due to a failure to write BK.
+ cursor.markDelete(positions.get(1));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cursor.getState(), "NoLedger");
+ });
+ assertTrue(cursor.getStats().getPersistLedgerErrors() > 0);
+ long persistZookeeperSucceed1 = cursor.getStats().getPersistZookeeperSucceed();
+ assertTrue(persistZookeeperSucceed1 > 0);
+
+ // Mock cursor ledger create failed.
+ bkc.failNow(BKException.Code.NoBookieAvailableException);
+ // Verify the cursor status will be persistent to ZK even if the cursor ledger creation always fails.
+ // This time ZK will be written due to catch up.
+ Position lastEntry = positions.get(entryCount -1);
+ cursor.markDelete(lastEntry);
+ long persistZookeeperSucceed2 = cursor.getStats().getPersistZookeeperSucceed();
+ assertTrue(persistZookeeperSucceed2 > persistZookeeperSucceed1);
+
+ // Assert persist mark deleted position to ZK was successful.
+ PositionImpl slowestReadPosition = ml.getCursors().getSlowestReaderPosition();
+ assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId());
+ assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
+ assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
+
+ // Verify the mark delete position can be recovered properly.
+ ml.close();
+ ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
+ ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
+ assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
+
+ // cleanup.
+ ml.delete();
+ }
+
@Test(timeOut = 20000)
void readWithCacheDisabled() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
@@ -1421,6 +1512,17 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
ledger = factory2.open("my_test_ledger");
ManagedCursor cursor = ledger.openCursor("c1");
Position position = ledger.addEntry("test".getBytes());
+ // Make persist zk fail once.
+ AtomicInteger persistZKTimes = new AtomicInteger();
+ metadataStore.failConditional(new MetadataStoreException.BadVersionException("mock ex"), (type, path) -> {
+ if (FaultInjectionMetadataStore.OperationType.PUT.equals(type)
+ && path.equals("/managed-ledgers/my_test_ledger/c1")) {
+ if (persistZKTimes.incrementAndGet() == 1) {
+ return true;
+ }
+ }
+ return false;
+ });
try {
cursor.markDelete(position);
fail("should have failed");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 70ddbb9998f..5fc2da22b66 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3968,6 +3968,70 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
Assert.assertFalse(managedLedger.ledgerCache.containsKey(lastLedger));
}
+ @Test
+ public void testIsNoMessagesAfterPos() throws Exception {
+ final byte[] data = new byte[]{1,2,3};
+ final String cursorName = "c1";
+ final String mlName = UUID.randomUUID().toString().replaceAll("-", "");
+ final ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
+ final ManagedCursor managedCursor = ml.openCursor(cursorName);
+
+ // One ledger.
+ PositionImpl p1 = (PositionImpl) ml.addEntry(data);
+ PositionImpl p2 = (PositionImpl) ml.addEntry(data);
+ PositionImpl p3 = (PositionImpl) ml.addEntry(data);
+ assertFalse(ml.isNoMessagesAfterPos(p1));
+ assertFalse(ml.isNoMessagesAfterPos(p2));
+ assertTrue(ml.isNoMessagesAfterPos(p3));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p3.getLedgerId() + 1, -1)));
+
+ // More than one ledger.
+ ml.ledgerClosed(ml.currentLedger);
+ PositionImpl p4 = (PositionImpl) ml.addEntry(data);
+ PositionImpl p5 = (PositionImpl) ml.addEntry(data);
+ PositionImpl p6 = (PositionImpl) ml.addEntry(data);
+ assertFalse(ml.isNoMessagesAfterPos(p1));
+ assertFalse(ml.isNoMessagesAfterPos(p2));
+ assertFalse(ml.isNoMessagesAfterPos(p3));
+ assertFalse(ml.isNoMessagesAfterPos(p4));
+ assertFalse(ml.isNoMessagesAfterPos(p5));
+ assertTrue(ml.isNoMessagesAfterPos(p6));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId(), p6.getEntryId() + 1)));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId() + 1, -1)));
+
+ // Switch ledger and make the entry id of Last confirmed entry is -1;
+ ml.ledgerClosed(ml.currentLedger);
+ ml.createLedgerAfterClosed();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml.currentLedgerEntries, 0);
+ });
+ ml.lastConfirmedEntry = PositionImpl.get(ml.currentLedger.getId(), -1);
+ assertFalse(ml.isNoMessagesAfterPos(p5));
+ assertTrue(ml.isNoMessagesAfterPos(p6));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId(), p6.getEntryId() + 1)));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId() + 1, -1)));
+
+ // Trim ledgers to make there is no entries in ML.
+ ml.deleteCursor(cursorName);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ml.trimConsumedLedgersInBackground(true, future);
+ future.get();
+ assertEquals(ml.ledgers.size(), 1);
+ assertEquals(ml.lastConfirmedEntry.getEntryId(), -1);
+ assertTrue(ml.isNoMessagesAfterPos(p1));
+ assertTrue(ml.isNoMessagesAfterPos(p2));
+ assertTrue(ml.isNoMessagesAfterPos(p3));
+ assertTrue(ml.isNoMessagesAfterPos(p4));
+ assertTrue(ml.isNoMessagesAfterPos(p5));
+ assertTrue(ml.isNoMessagesAfterPos(p6));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId(), p6.getEntryId() + 1)));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId() + 1, -1)));
+
+ // cleanup.
+ ml.close();
+ }
+
@Test
public void testGetEstimatedBacklogSize() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();