You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/14 10:20:53 UTC
[pulsar] branch branch-2.9 updated: [ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 6c2cc9bd3b8 [ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)
6c2cc9bd3b8 is described below
commit 6c2cc9bd3b841bd970ebf54b820469019e99997a
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Apr 14 13:04:25 2022 +0300
[ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)
- follow up on #15031
* [ML] Fix race in persisting mark delete position
* [ML] Resetting should reset lastMarkDeleteEntry
* [ML] Reset fields in initializeCursorPosition method
(cherry picked from commit a19a30a5e89bfd2c2da5460db6120c8e0a48d8f7)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 100 +++++++++++++++------
1 file changed, 75 insertions(+), 25 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b3ba61810c2..adb29bb37bb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -118,6 +118,11 @@ public class ManagedCursorImpl implements ManagedCursor {
// this position is have persistent mark delete position
protected volatile PositionImpl persistentMarkDeletePosition;
+ protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl>
+ INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class,
+ "inProgressMarkDeletePersistPosition");
+ protected volatile PositionImpl inProgressMarkDeletePersistPosition;
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl> READ_POSITION_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class, "readPosition");
@@ -214,6 +219,31 @@ public class ManagedCursorImpl implements ManagedCursor {
this.callback = callback;
this.ctx = ctx;
}
+
+ public void triggerComplete() {
+ // Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
+ // will ensure that no race condition will happen between the next mark-delete and the switching
+ // operation.
+ if (callbackGroup != null) {
+ // Trigger the callback for every request in the group
+ for (MarkDeleteEntry e : callbackGroup) {
+ e.callback.markDeleteComplete(e.ctx);
+ }
+ } else if (callback != null) {
+ // Only trigger the callback for the current request
+ callback.markDeleteComplete(ctx);
+ }
+ }
+
+ public void triggerFailed(ManagedLedgerException exception) {
+ if (callbackGroup != null) {
+ for (MarkDeleteEntry e : callbackGroup) {
+ e.callback.markDeleteFailed(exception, e.ctx);
+ }
+ } else if (callback != null) {
+ callback.markDeleteFailed(exception, ctx);
+ }
+ }
}
protected final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
@@ -540,6 +570,7 @@ public class ManagedCursorImpl implements ManagedCursor {
messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
markDeletePosition = position;
persistentMarkDeletePosition = position;
+ inProgressMarkDeletePersistPosition = null;
readPosition = ledger.getNextValidPosition(position);
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
// assign cursor-ledger so, it can be deleted when new ledger will be switched
@@ -1121,7 +1152,11 @@ public class ManagedCursorImpl implements ManagedCursor {
};
- internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() {
+ persistentMarkDeletePosition = null;
+ inProgressMarkDeletePersistPosition = null;
+ lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
+ internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
+ new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
finalCallback.operationComplete();
@@ -1567,6 +1602,9 @@ public class ManagedCursorImpl implements ManagedCursor {
void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
markDeletePosition = lastPositionCounter.getLeft();
+ lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
+ persistentMarkDeletePosition = null;
+ inProgressMarkDeletePersistPosition = null;
// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is 0, to ensure the initial backlog count is 0.
@@ -1781,6 +1819,34 @@ public class ManagedCursorImpl implements ManagedCursor {
}
void internalMarkDelete(final MarkDeleteEntry mdEntry) {
+ if (persistentMarkDeletePosition != null
+ && mdEntry.newPosition.compareTo(persistentMarkDeletePosition) < 0) {
+ if (log.isInfoEnabled()) {
+ log.info("Skipping updating mark delete position to {}. The persisted mark delete position {} "
+ + "is later.", mdEntry.newPosition, persistentMarkDeletePosition);
+ }
+ mdEntry.triggerComplete();
+ return;
+ }
+
+ PositionImpl inProgressLatest = INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER.updateAndGet(this, current -> {
+ if (current != null && current.compareTo(mdEntry.newPosition) > 0) {
+ return current;
+ } else {
+ return mdEntry.newPosition;
+ }
+ });
+
+ // if there's a newer or equal mark delete update in progress, skip it.
+ if (inProgressLatest != mdEntry.newPosition) {
+ if (log.isInfoEnabled()) {
+ log.info("Skipping updating mark delete position to {}. The mark delete position update "
+ + "in progress {} is later.", mdEntry.newPosition, inProgressLatest);
+ }
+ mdEntry.triggerComplete();
+ return;
+ }
+
// The counter is used to mark all the pending mark-delete request that were submitted to BK and that are not
// yet finished. While we have outstanding requests we cannot close the current ledger, so the switch to new
// ledger is postponed to when the counter goes to 0.
@@ -1803,6 +1869,9 @@ public class ManagedCursorImpl implements ManagedCursor {
mdEntry.newPosition);
}
+ INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER.compareAndSet(ManagedCursorImpl.this,
+ mdEntry.newPosition, null);
+
// Remove from the individual deleted messages all the entries before the new mark delete
// point.
lock.writeLock().lock();
@@ -1814,11 +1883,7 @@ public class ManagedCursorImpl implements ManagedCursor {
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
}
- if (persistentMarkDeletePosition == null
- || mdEntry.newPosition.compareTo(persistentMarkDeletePosition) > 0) {
- persistentMarkDeletePosition = mdEntry.newPosition;
- }
-
+ persistentMarkDeletePosition = mdEntry.newPosition;
} finally {
lock.writeLock().unlock();
}
@@ -1827,22 +1892,13 @@ public class ManagedCursorImpl implements ManagedCursor {
decrementPendingMarkDeleteCount();
- // Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
- // will ensure that no race condition will happen between the next mark-delete and the switching
- // operation.
- if (mdEntry.callbackGroup != null) {
- // Trigger the callback for every request in the group
- for (MarkDeleteEntry e : mdEntry.callbackGroup) {
- e.callback.markDeleteComplete(e.ctx);
- }
- } else {
- // Only trigger the callback for the current request
- mdEntry.callback.markDeleteComplete(mdEntry.ctx);
- }
+ mdEntry.triggerComplete();
}
@Override
public void operationFailed(ManagedLedgerException exception) {
+ INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER.compareAndSet(ManagedCursorImpl.this,
+ mdEntry.newPosition, null);
isDirty = true;
log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(),
ManagedCursorImpl.this, mdEntry.newPosition);
@@ -1853,13 +1909,7 @@ public class ManagedCursorImpl implements ManagedCursor {
decrementPendingMarkDeleteCount();
- if (mdEntry.callbackGroup != null) {
- for (MarkDeleteEntry e : mdEntry.callbackGroup) {
- e.callback.markDeleteFailed(exception, e.ctx);
- }
- } else {
- mdEntry.callback.markDeleteFailed(exception, mdEntry.ctx);
- }
+ mdEntry.triggerFailed(exception);
}
});
}