You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/31 06:14:27 UTC

[pulsar] 05/05: [fix][ml] Persist correct markDeletePosition to prevent message loss (#18237)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 05efd10278eb1a3df3f24b9a2add390283196b09
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Sun Oct 30 22:48:43 2022 -0700

    [fix][ml] Persist correct markDeletePosition to prevent message loss (#18237)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  60 +++++-----
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 123 ++++++++++++++++++++-
 2 files changed, 153 insertions(+), 30 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index da855197df6..4f1a376771c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1153,29 +1153,33 @@ public class ManagedCursorImpl implements ManagedCursor {
         return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0);
     }
 
-    protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
-        if (position.equals(PositionImpl.EARLIEST)) {
-            position = ledger.getFirstPosition();
-        } else if (position.equals(PositionImpl.LATEST)) {
-            position = ledger.getLastPosition().getNext();
+    protected void internalResetCursor(PositionImpl proposedReadPosition,
+                                       AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
+        final PositionImpl newReadPosition;
+        if (proposedReadPosition.equals(PositionImpl.EARLIEST)) {
+            newReadPosition = ledger.getFirstPosition();
+        } else if (proposedReadPosition.equals(PositionImpl.LATEST)) {
+            newReadPosition = ledger.getLastPosition().getNext();
+        } else {
+            newReadPosition = proposedReadPosition;
         }
 
-        log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name);
+        log.info("[{}] Initiate reset readPosition to {} on cursor {}", ledger.getName(), newReadPosition, name);
 
         synchronized (pendingMarkDeleteOps) {
             if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) {
-                log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}",
-                        ledger.getName(), position, name);
+                log.error("[{}] reset requested - readPosition [{}], previous reset in progress - cursor {}",
+                        ledger.getName(), newReadPosition, name);
                 resetCursorCallback.resetFailed(
                         new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"),
-                        position);
+                        newReadPosition);
                 return;
             }
         }
 
         final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback;
 
-        final PositionImpl newPosition = position;
+        final PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition);
 
         VoidCallback finalCallback = new VoidCallback() {
             @Override
@@ -1184,8 +1188,6 @@ public class ManagedCursorImpl implements ManagedCursor {
                 // modify mark delete and read position since we are able to persist new position for cursor
                 lock.writeLock().lock();
                 try {
-                    PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition);
-
                     if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
                         MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
                                 Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
@@ -1200,34 +1202,34 @@ public class ManagedCursorImpl implements ManagedCursor {
                     if (config.isDeletionAtBatchIndexLevelEnabled()) {
                         batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
                         batchDeletedIndexes.clear();
-                        long[] resetWords = newPosition.ackSet;
+                        long[] resetWords = newReadPosition.ackSet;
                         if (resetWords != null) {
                             BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
-                            batchDeletedIndexes.put(newPosition, ackSet);
+                            batchDeletedIndexes.put(newReadPosition, ackSet);
                         }
                     }
 
                     PositionImpl oldReadPosition = readPosition;
-                    if (oldReadPosition.compareTo(newPosition) >= 0) {
-                        log.info("[{}] reset position to {} before current read position {} on cursor {}",
-                                ledger.getName(), newPosition, oldReadPosition, name);
+                    if (oldReadPosition.compareTo(newReadPosition) >= 0) {
+                        log.info("[{}] reset readPosition to {} before current read readPosition {} on cursor {}",
+                                ledger.getName(), newReadPosition, oldReadPosition, name);
                     } else {
-                        log.info("[{}] reset position to {} skipping from current read position {} on cursor {}",
-                                ledger.getName(), newPosition, oldReadPosition, name);
+                        log.info("[{}] reset readPosition to {} skipping from current read readPosition {} on "
+                                        + "cursor {}", ledger.getName(), newReadPosition, oldReadPosition, name);
                     }
-                    readPosition = newPosition;
-                    ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition);
+                    readPosition = newReadPosition;
+                    ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
                 } finally {
                     lock.writeLock().unlock();
                 }
                 synchronized (pendingMarkDeleteOps) {
                     pendingMarkDeleteOps.clear();
                     if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
-                        log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}",
-                                ledger.getName(), newPosition, name);
+                        log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
+                                ledger.getName(), newReadPosition, name);
                     }
                 }
-                callback.resetComplete(newPosition);
+                callback.resetComplete(newReadPosition);
                 updateLastActive();
             }
 
@@ -1235,20 +1237,20 @@ public class ManagedCursorImpl implements ManagedCursor {
             public void operationFailed(ManagedLedgerException exception) {
                 synchronized (pendingMarkDeleteOps) {
                     if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
-                        log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}",
-                                ledger.getName(), newPosition, name);
+                        log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
+                                ledger.getName(), newReadPosition, name);
                     }
                 }
                 callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(
-                        "unable to persist position for cursor reset " + newPosition.toString()), newPosition);
+                        "unable to persist readPosition for cursor reset " + newReadPosition), newReadPosition);
             }
 
         };
 
         persistentMarkDeletePosition = null;
         inProgressMarkDeletePersistPosition = null;
-        lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
-        internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
+        lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, getProperties(), null, null);
+        internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
                 new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 0e66e76d5c3..1a8feea1e0d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -93,6 +93,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.IntRange;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet;
@@ -676,7 +677,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         }
         assertTrue(moveStatus.get());
         PositionImpl earliestPos = new PositionImpl(actualEarliest.getLedgerId(), -1);
-        assertEquals(earliestPos, cursor.getReadPosition());
+        assertEquals(cursor.getReadPosition(), earliestPos);
         moveStatus.set(false);
 
         // reset to one after last entry in a ledger should point to the first entry in the next ledger
@@ -3283,6 +3284,126 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         });
     }
 
+    @Test(timeOut = 20000)
+    public void testRecoverCursorAfterResetToLatestForNewEntry() throws Exception {
+        ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForNewEntry");
+        ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);
+
+        // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+        assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+        assertEquals(c.getReadPosition().getEntryId(), 0);
+        assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+        c.resetCursor(PositionImpl.LATEST);
+
+        // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+        assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+        assertEquals(c.getReadPosition().getEntryId(), 0);
+        assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+        final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
+        final Position readPositionBeforeRecover = c.getReadPosition();
+
+        // Trigger the lastConfirmedEntry to move forward
+        ml.addEntry(new byte[1]);
+
+        ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
+                .setCursorsLedgerId(c.getCursorLedger())
+                .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
+                .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
+                .setLastActive(0L)
+                .build();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean failed = new AtomicBoolean(false);
+        c.recoverFromLedger(info, new VoidCallback() {
+            @Override
+            public void operationComplete() {
+                latch.countDown();
+            }
+
+            @Override
+            public void operationFailed(ManagedLedgerException exception) {
+                failed.set(true);
+                latch.countDown();
+            }
+        });
+
+        latch.await();
+        if (failed.get()) {
+            fail("Cursor recovery should not fail");
+        }
+        assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
+        assertEquals(c.getReadPosition(), readPositionBeforeRecover);
+        assertEquals(c.getNumberOfEntries(), 1L);
+    }
+
+    @Test(timeOut = 20000)
+    public void testRecoverCursorAfterResetToLatestForMultipleEntries() throws Exception {
+        ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForMultipleEntries");
+        ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);
+
+        // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+        assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+        assertEquals(c.getReadPosition().getEntryId(), 0);
+        assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+        c.resetCursor(PositionImpl.LATEST);
+
+        // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+        assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+        assertEquals(c.getReadPosition().getEntryId(), 0);
+        assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+        // Trigger the lastConfirmedEntry to move forward
+        ml.addEntry(new byte[1]);
+        ml.addEntry(new byte[1]);
+        ml.addEntry(new byte[1]);
+        ml.addEntry(new byte[1]);
+
+        c.resetCursor(PositionImpl.LATEST);
+
+        assertEquals(c.getMarkDeletedPosition().getEntryId(), 3);
+        assertEquals(c.getReadPosition().getEntryId(), 4);
+        assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3);
+
+        // Publish messages to move the lastConfirmedEntry field forward
+        ml.addEntry(new byte[1]);
+        ml.addEntry(new byte[1]);
+
+        final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
+        final Position readPositionBeforeRecover = c.getReadPosition();
+
+        ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
+                .setCursorsLedgerId(c.getCursorLedger())
+                .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
+                .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
+                .setLastActive(0L)
+                .build();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean failed = new AtomicBoolean(false);
+        c.recoverFromLedger(info, new VoidCallback() {
+            @Override
+            public void operationComplete() {
+                latch.countDown();
+            }
+
+            @Override
+            public void operationFailed(ManagedLedgerException exception) {
+                failed.set(true);
+                latch.countDown();
+            }
+        });
+
+        latch.await();
+        if (failed.get()) {
+            fail("Cursor recovery should not fail");
+        }
+        assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
+        assertEquals(c.getReadPosition(), readPositionBeforeRecover);
+        assertEquals(c.getNumberOfEntries(), 2L);
+    }
     @Test
     void testAlwaysInactive() throws Exception {
         ManagedLedger ml = factory.open("testAlwaysInactive");