You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/31 06:14:27 UTC
[pulsar] 05/05: [fix][ml] Persist correct markDeletePosition to prevent message loss (#18237)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 05efd10278eb1a3df3f24b9a2add390283196b09
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Sun Oct 30 22:48:43 2022 -0700
[fix][ml] Persist correct markDeletePosition to prevent message loss (#18237)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 60 +++++-----
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 123 ++++++++++++++++++++-
2 files changed, 153 insertions(+), 30 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index da855197df6..4f1a376771c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1153,29 +1153,33 @@ public class ManagedCursorImpl implements ManagedCursor {
return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0);
}
- protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
- if (position.equals(PositionImpl.EARLIEST)) {
- position = ledger.getFirstPosition();
- } else if (position.equals(PositionImpl.LATEST)) {
- position = ledger.getLastPosition().getNext();
+ protected void internalResetCursor(PositionImpl proposedReadPosition,
+ AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
+ final PositionImpl newReadPosition;
+ if (proposedReadPosition.equals(PositionImpl.EARLIEST)) {
+ newReadPosition = ledger.getFirstPosition();
+ } else if (proposedReadPosition.equals(PositionImpl.LATEST)) {
+ newReadPosition = ledger.getLastPosition().getNext();
+ } else {
+ newReadPosition = proposedReadPosition;
}
- log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name);
+ log.info("[{}] Initiate reset readPosition to {} on cursor {}", ledger.getName(), newReadPosition, name);
synchronized (pendingMarkDeleteOps) {
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) {
- log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}",
- ledger.getName(), position, name);
+ log.error("[{}] reset requested - readPosition [{}], previous reset in progress - cursor {}",
+ ledger.getName(), newReadPosition, name);
resetCursorCallback.resetFailed(
new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"),
- position);
+ newReadPosition);
return;
}
}
final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback;
- final PositionImpl newPosition = position;
+ final PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition);
VoidCallback finalCallback = new VoidCallback() {
@Override
@@ -1184,8 +1188,6 @@ public class ManagedCursorImpl implements ManagedCursor {
// modify mark delete and read position since we are able to persist new position for cursor
lock.writeLock().lock();
try {
- PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition);
-
if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
@@ -1200,34 +1202,34 @@ public class ManagedCursorImpl implements ManagedCursor {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
- long[] resetWords = newPosition.ackSet;
+ long[] resetWords = newReadPosition.ackSet;
if (resetWords != null) {
BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
- batchDeletedIndexes.put(newPosition, ackSet);
+ batchDeletedIndexes.put(newReadPosition, ackSet);
}
}
PositionImpl oldReadPosition = readPosition;
- if (oldReadPosition.compareTo(newPosition) >= 0) {
- log.info("[{}] reset position to {} before current read position {} on cursor {}",
- ledger.getName(), newPosition, oldReadPosition, name);
+ if (oldReadPosition.compareTo(newReadPosition) >= 0) {
+ log.info("[{}] reset readPosition to {} before current read readPosition {} on cursor {}",
+ ledger.getName(), newReadPosition, oldReadPosition, name);
} else {
- log.info("[{}] reset position to {} skipping from current read position {} on cursor {}",
- ledger.getName(), newPosition, oldReadPosition, name);
+ log.info("[{}] reset readPosition to {} skipping from current read readPosition {} on "
+ + "cursor {}", ledger.getName(), newReadPosition, oldReadPosition, name);
}
- readPosition = newPosition;
- ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition);
+ readPosition = newReadPosition;
+ ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
} finally {
lock.writeLock().unlock();
}
synchronized (pendingMarkDeleteOps) {
pendingMarkDeleteOps.clear();
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
- log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}",
- ledger.getName(), newPosition, name);
+ log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
+ ledger.getName(), newReadPosition, name);
}
}
- callback.resetComplete(newPosition);
+ callback.resetComplete(newReadPosition);
updateLastActive();
}
@@ -1235,20 +1237,20 @@ public class ManagedCursorImpl implements ManagedCursor {
public void operationFailed(ManagedLedgerException exception) {
synchronized (pendingMarkDeleteOps) {
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
- log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}",
- ledger.getName(), newPosition, name);
+ log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
+ ledger.getName(), newReadPosition, name);
}
}
callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(
- "unable to persist position for cursor reset " + newPosition.toString()), newPosition);
+ "unable to persist readPosition for cursor reset " + newReadPosition), newReadPosition);
}
};
persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;
- lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
- internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
+ lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, getProperties(), null, null);
+ internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 0e66e76d5c3..1a8feea1e0d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -93,6 +93,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
@@ -676,7 +677,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
assertTrue(moveStatus.get());
PositionImpl earliestPos = new PositionImpl(actualEarliest.getLedgerId(), -1);
- assertEquals(earliestPos, cursor.getReadPosition());
+ assertEquals(cursor.getReadPosition(), earliestPos);
moveStatus.set(false);
// reset to one after last entry in a ledger should point to the first entry in the next ledger
@@ -3283,6 +3284,126 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
});
}
+ @Test(timeOut = 20000)
+ public void testRecoverCursorAfterResetToLatestForNewEntry() throws Exception {
+ ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForNewEntry");
+ ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);
+
+ // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+ assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+ assertEquals(c.getReadPosition().getEntryId(), 0);
+ assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+ c.resetCursor(PositionImpl.LATEST);
+
+ // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+ assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+ assertEquals(c.getReadPosition().getEntryId(), 0);
+ assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+ final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
+ final Position readPositionBeforeRecover = c.getReadPosition();
+
+ // Trigger the lastConfirmedEntry to move forward
+ ml.addEntry(new byte[1]);
+
+ ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
+ .setCursorsLedgerId(c.getCursorLedger())
+ .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
+ .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
+ .setLastActive(0L)
+ .build();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ c.recoverFromLedger(info, new VoidCallback() {
+ @Override
+ public void operationComplete() {
+ latch.countDown();
+ }
+
+ @Override
+ public void operationFailed(ManagedLedgerException exception) {
+ failed.set(true);
+ latch.countDown();
+ }
+ });
+
+ latch.await();
+ if (failed.get()) {
+ fail("Cursor recovery should not fail");
+ }
+ assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
+ assertEquals(c.getReadPosition(), readPositionBeforeRecover);
+ assertEquals(c.getNumberOfEntries(), 1L);
+ }
+
+ @Test(timeOut = 20000)
+ public void testRecoverCursorAfterResetToLatestForMultipleEntries() throws Exception {
+ ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForMultipleEntries");
+ ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);
+
+ // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+ assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+ assertEquals(c.getReadPosition().getEntryId(), 0);
+ assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+ c.resetCursor(PositionImpl.LATEST);
+
+ // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
+ assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
+ assertEquals(c.getReadPosition().getEntryId(), 0);
+ assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);
+
+ // Trigger the lastConfirmedEntry to move forward
+ ml.addEntry(new byte[1]);
+ ml.addEntry(new byte[1]);
+ ml.addEntry(new byte[1]);
+ ml.addEntry(new byte[1]);
+
+ c.resetCursor(PositionImpl.LATEST);
+
+ assertEquals(c.getMarkDeletedPosition().getEntryId(), 3);
+ assertEquals(c.getReadPosition().getEntryId(), 4);
+ assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3);
+
+ // Publish messages to move the lastConfirmedEntry field forward
+ ml.addEntry(new byte[1]);
+ ml.addEntry(new byte[1]);
+
+ final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
+ final Position readPositionBeforeRecover = c.getReadPosition();
+
+ ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
+ .setCursorsLedgerId(c.getCursorLedger())
+ .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
+ .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
+ .setLastActive(0L)
+ .build();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ c.recoverFromLedger(info, new VoidCallback() {
+ @Override
+ public void operationComplete() {
+ latch.countDown();
+ }
+
+ @Override
+ public void operationFailed(ManagedLedgerException exception) {
+ failed.set(true);
+ latch.countDown();
+ }
+ });
+
+ latch.await();
+ if (failed.get()) {
+ fail("Cursor recovery should not fail");
+ }
+ assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
+ assertEquals(c.getReadPosition(), readPositionBeforeRecover);
+ assertEquals(c.getNumberOfEntries(), 2L);
+ }
@Test
void testAlwaysInactive() throws Exception {
ManagedLedger ml = factory.open("testAlwaysInactive");