You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/08/31 18:37:09 UTC

[pulsar] branch master updated: [improve] [ml] Persist mark deleted ops to ZK if create cursor ledger was failed (#20935)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 843b8307f44 [improve] [ml] Persist mark deleted ops to ZK if create cursor ledger was failed (#20935)
843b8307f44 is described below

commit 843b8307f44cd5e3a2d59ab93cc6b766f0c4ce0f
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Sep 1 02:37:00 2023 +0800

    [improve] [ml] Persist mark deleted ops to ZK if create cursor ledger was failed (#20935)
    
    The progress Persist mark deleted position is like this:
    - persist to BK
    - If failed to persist to BK, try to persist to ZK
    
    But in the current implementation: if the cursor ledger was created failed, Pulsar will not try to persist to ZK. It makes if the cursor ledger created fails, a lot of ack records can not be persisted, and we will get a lot of repeat consumption after the BK recover.
    
    Modifications: Try to persist the mark deleted position to ZK if the cursor ledger was created failed
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  77 ++++++++++------
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  28 ++++++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 102 +++++++++++++++++++++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  64 +++++++++++++
 4 files changed, 241 insertions(+), 30 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8ce3a322c09..e2b202cce15 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2099,7 +2099,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             }
         });
 
-        persistPositionToLedger(cursorLedger, mdEntry, new VoidCallback() {
+        VoidCallback cb = new VoidCallback() {
             @Override
             public void operationComplete() {
                 if (log.isDebugEnabled()) {
@@ -2151,7 +2151,18 @@ public class ManagedCursorImpl implements ManagedCursor {
 
                 mdEntry.triggerFailed(exception);
             }
-        });
+        };
+
+        if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
+            if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
+                persistPositionToMetaStore(mdEntry, cb);
+            } else {
+                mdEntry.callback.markDeleteFailed(new ManagedLedgerException("Create new cursor ledger failed"),
+                        mdEntry.ctx);
+            }
+        } else {
+            persistPositionToLedger(cursorLedger, mdEntry, cb);
+        }
     }
 
     @Override
@@ -2797,16 +2808,15 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             @Override
             public void operationFailed(ManagedLedgerException exception) {
-                log.error("[{}][{}] Metadata ledger creation failed", ledger.getName(), name, exception);
+                log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata"
+                        + " store.", ledger.getName(), name, exception);
 
                 synchronized (pendingMarkDeleteOps) {
-                    while (!pendingMarkDeleteOps.isEmpty()) {
-                        MarkDeleteEntry entry = pendingMarkDeleteOps.poll();
-                        entry.callback.markDeleteFailed(exception, entry.ctx);
-                    }
-
                     // At this point we don't have a ledger ready
                     STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
+                    // Note: if the stat is NoLedger, will persist the mark deleted position to metadata store.
+                    // Before giving up, try to persist the position in the metadata store.
+                    flushPendingMarkDeletes();
                 }
             }
         });
@@ -3073,32 +3083,39 @@ public class ManagedCursorImpl implements ManagedCursor {
                 // in the meantime the mark-delete will be queued.
                 STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
 
-                mbean.persistToLedger(false);
-                // Before giving up, try to persist the position in the metadata store
-                persistPositionMetaStore(-1, position, mdEntry.properties, new MetaStoreCallback<Void>() {
-                    @Override
-                    public void operationComplete(Void result, Stat stat) {
-                        if (log.isDebugEnabled()) {
-                            log.debug(
-                                    "[{}][{}] Updated cursor in meta store after previous failure in ledger at position"
-                                    + " {}", ledger.getName(), name, position);
-                        }
-                        mbean.persistToZookeeper(true);
-                        callback.operationComplete();
-                    }
-
-                    @Override
-                    public void operationFailed(MetaStoreException e) {
-                        log.warn("[{}][{}] Failed to update cursor in meta store after previous failure in ledger: {}",
-                                ledger.getName(), name, e.getMessage());
-                        mbean.persistToZookeeper(false);
-                        callback.operationFailed(createManagedLedgerException(rc));
-                    }
-                }, true);
+                // Before giving up, try to persist the position in the metadata store.
+                persistPositionToMetaStore(mdEntry, callback);
             }
         }, null);
     }
 
+    void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final VoidCallback callback) {
+        final PositionImpl newPosition = mdEntry.newPosition;
+        STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
+        mbean.persistToLedger(false);
+        // Before giving up, try to persist the position in the metadata store
+        persistPositionMetaStore(-1, newPosition, mdEntry.properties, new MetaStoreCallback<Void>() {
+            @Override
+            public void operationComplete(Void result, Stat stat) {
+                if (log.isDebugEnabled()) {
+                    log.debug(
+                            "[{}][{}] Updated cursor in meta store after previous failure in ledger at position"
+                            + " {}", ledger.getName(), name, newPosition);
+                }
+                mbean.persistToZookeeper(true);
+                callback.operationComplete();
+            }
+
+            @Override
+            public void operationFailed(MetaStoreException e) {
+                log.warn("[{}][{}] Failed to update cursor in meta store after previous failure in ledger: {}",
+                        ledger.getName(), name, e.getMessage());
+                mbean.persistToZookeeper(false);
+                callback.operationFailed(createManagedLedgerException(e));
+            }
+        }, true);
+    }
+
     boolean shouldCloseLedger(LedgerHandle lh) {
         long now = clock.millis();
         if (ledger.getFactory().isMetadataServiceAvailable()
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c31a0c38cd3..89c18f4b834 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3552,6 +3552,34 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return positionToReturn;
     }
 
+    public boolean isNoMessagesAfterPos(PositionImpl pos) {
+        PositionImpl lac = (PositionImpl) getLastConfirmedEntry();
+        return isNoMessagesAfterPosForSpecifiedLac(lac, pos);
+    }
+
+    private boolean isNoMessagesAfterPosForSpecifiedLac(PositionImpl specifiedLac, PositionImpl pos) {
+        if (pos.compareTo(specifiedLac) >= 0) {
+            return true;
+        }
+        if (specifiedLac.getEntryId() < 0) {
+            // Calculate the meaningful LAC.
+            PositionImpl actLac = getPreviousPosition(specifiedLac);
+            if (actLac.getEntryId() >= 0) {
+                return pos.compareTo(actLac) >= 0;
+            } else {
+                // If the actual LAC is still not meaningful.
+                if (actLac.equals(specifiedLac)) {
+                    // No entries in maneged ledger.
+                    return true;
+                } else {
+                    // Continue to find a valid LAC.
+                    return isNoMessagesAfterPosForSpecifiedLac(actLac, pos);
+                }
+            }
+        }
+        return false;
+    }
+
     /**
      * Get the entry position that come before the specified position in the message stream, using information from the
      * ledger list and each ledger entries count.
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1b1b5534256..627ae73d928 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -229,6 +229,97 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         entries.forEach(Entry::release);
     }
 
+    @Test
+    void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
+        final int entryCount = 10;
+        final String cursorName = "c1";
+        final String mlName = "ml_test";
+        final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
+
+        ManagedCursor cursor = ml.openCursor("c1");
+        Position lastEntry = null;
+        for (int i = 0; i < 10; i++) {
+            lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+
+        // Mock cursor ledger create failed.
+        bkc.failNow(BKException.Code.NoBookieAvailableException);
+
+        cursor.markDelete(lastEntry);
+
+        // Assert persist mark deleted position to ZK was successful.
+        PositionImpl slowestReadPosition = ml.getCursors().getSlowestReaderPosition();
+        assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId());
+        assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
+        assertEquals(cursor.getStats().getPersistLedgerSucceed(), 0);
+        assertTrue(cursor.getStats().getPersistZookeeperSucceed() > 0);
+        assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
+
+        // Verify the mark delete position can be recovered properly.
+        ml.close();
+        ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
+        ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
+        assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
+
+        // cleanup.
+        ml.delete();
+    }
+
+    @Test
+    void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception {
+        final int entryCount = 10;
+        final String cursorName = "c1";
+        final String mlName = "ml_test";
+        final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
+
+        final ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor(cursorName);
+        ArrayList<Position> positions = new ArrayList<>();
+        for (int i = 0; i < entryCount; i++) {
+            positions.add(ml.addEntry(("entry-" + i).getBytes(Encoding)));
+        }
+        // Trigger the cursor ledger creating.
+        cursor.markDelete(positions.get(0));
+        assertTrue(cursor.getStats().getPersistLedgerSucceed() > 0);
+
+        // Mock cursor ledger write failed.
+        bkc.addEntryFailAfter(0, BKException.Code.NoBookieAvailableException);
+        // Trigger a failed writing of the cursor ledger, then wait the stat of cursor to be "NoLedger".
+        // This time ZK will be written due to a failure to write BK.
+        cursor.markDelete(positions.get(1));
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cursor.getState(), "NoLedger");
+        });
+        assertTrue(cursor.getStats().getPersistLedgerErrors() > 0);
+        long persistZookeeperSucceed1 = cursor.getStats().getPersistZookeeperSucceed();
+        assertTrue(persistZookeeperSucceed1 > 0);
+
+        // Mock cursor ledger create failed.
+        bkc.failNow(BKException.Code.NoBookieAvailableException);
+        // Verify the cursor status will be persistent to ZK even if the cursor ledger creation always fails.
+        // This time ZK will be written due to catch up.
+        Position lastEntry = positions.get(entryCount -1);
+        cursor.markDelete(lastEntry);
+        long persistZookeeperSucceed2 = cursor.getStats().getPersistZookeeperSucceed();
+        assertTrue(persistZookeeperSucceed2 > persistZookeeperSucceed1);
+
+        // Assert persist mark deleted position to ZK was successful.
+        PositionImpl slowestReadPosition = ml.getCursors().getSlowestReaderPosition();
+        assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId());
+        assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
+        assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
+
+        // Verify the mark delete position can be recovered properly.
+        ml.close();
+        ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
+        ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
+        assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
+
+        // cleanup.
+        ml.delete();
+    }
+
     @Test(timeOut = 20000)
     void readWithCacheDisabled() throws Exception {
         ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
@@ -1421,6 +1512,17 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         ledger = factory2.open("my_test_ledger");
         ManagedCursor cursor = ledger.openCursor("c1");
         Position position = ledger.addEntry("test".getBytes());
+        // Make persist zk fail once.
+        AtomicInteger persistZKTimes = new AtomicInteger();
+        metadataStore.failConditional(new MetadataStoreException.BadVersionException("mock ex"), (type, path) -> {
+            if (FaultInjectionMetadataStore.OperationType.PUT.equals(type)
+                    && path.equals("/managed-ledgers/my_test_ledger/c1")) {
+                if (persistZKTimes.incrementAndGet() == 1) {
+                    return true;
+                }
+            }
+            return false;
+        });
         try {
             cursor.markDelete(position);
             fail("should have failed");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 70ddbb9998f..5fc2da22b66 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3968,6 +3968,70 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         Assert.assertFalse(managedLedger.ledgerCache.containsKey(lastLedger));
     }
 
+    @Test
+    public void testIsNoMessagesAfterPos() throws Exception {
+        final byte[] data = new byte[]{1,2,3};
+        final String cursorName = "c1";
+        final String mlName = UUID.randomUUID().toString().replaceAll("-", "");
+        final ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
+        final ManagedCursor managedCursor = ml.openCursor(cursorName);
+
+        // One ledger.
+        PositionImpl p1 = (PositionImpl) ml.addEntry(data);
+        PositionImpl p2 = (PositionImpl) ml.addEntry(data);
+        PositionImpl p3 = (PositionImpl) ml.addEntry(data);
+        assertFalse(ml.isNoMessagesAfterPos(p1));
+        assertFalse(ml.isNoMessagesAfterPos(p2));
+        assertTrue(ml.isNoMessagesAfterPos(p3));
+        assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)));
+        assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p3.getLedgerId() + 1, -1)));
+
+        // More than one ledger.
+        ml.ledgerClosed(ml.currentLedger);
+        PositionImpl p4 = (PositionImpl) ml.addEntry(data);
+        PositionImpl p5 = (PositionImpl) ml.addEntry(data);
+        PositionImpl p6 = (PositionImpl) ml.addEntry(data);
+        assertFalse(ml.isNoMessagesAfterPos(p1));
+        assertFalse(ml.isNoMessagesAfterPos(p2));
+        assertFalse(ml.isNoMessagesAfterPos(p3));
+        assertFalse(ml.isNoMessagesAfterPos(p4));
+        assertFalse(ml.isNoMessagesAfterPos(p5));
+        assertTrue(ml.isNoMessagesAfterPos(p6));
+        assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId(), p6.getEntryId() + 1)));
+        assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId() + 1, -1)));
+
+        // Switch ledger and make the entry id of Last confirmed entry is -1;
+        ml.ledgerClosed(ml.currentLedger);
+        ml.createLedgerAfterClosed();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml.currentLedgerEntries, 0);
+        });
+        ml.lastConfirmedEntry = PositionImpl.get(ml.currentLedger.getId(), -1);
+        assertFalse(ml.isNoMessagesAfterPos(p5));
+        assertTrue(ml.isNoMessagesAfterPos(p6));
+        assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId(), p6.getEntryId() + 1)));
+        assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId() + 1, -1)));
+
+        // Trim ledgers to make there is no entries in ML.
+        ml.deleteCursor(cursorName);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ml.trimConsumedLedgersInBackground(true, future);
+        future.get();
+        assertEquals(ml.ledgers.size(), 1);
+        assertEquals(ml.lastConfirmedEntry.getEntryId(), -1);
+        assertTrue(ml.isNoMessagesAfterPos(p1));
+        assertTrue(ml.isNoMessagesAfterPos(p2));
+        assertTrue(ml.isNoMessagesAfterPos(p3));
+        assertTrue(ml.isNoMessagesAfterPos(p4));
+        assertTrue(ml.isNoMessagesAfterPos(p5));
+        assertTrue(ml.isNoMessagesAfterPos(p6));
+        assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId(), p6.getEntryId() + 1)));
+        assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId() + 1, -1)));
+
+        // cleanup.
+        ml.close();
+    }
+
     @Test
     public void testGetEstimatedBacklogSize() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig();