You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/06 07:09:18 UTC
[pulsar] branch branch-2.8 updated: [ML] Fix race condition in updating lastMarkDeleteEntry field (#15031)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new f657bf8a36d [ML] Fix race condition in updating lastMarkDeleteEntry field (#15031)
f657bf8a36d is described below
commit f657bf8a36d54241f96fb2bf68e72a5bdd1e9ef6
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Apr 5 22:44:25 2022 +0300
[ML] Fix race condition in updating lastMarkDeleteEntry field (#15031)
- missed updates can lead to the subscription and consuming getting stuck
(cherry picked from commit ad2f397fb9bb1d6f90a980e68f0161ed32900309)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 31 ++++++++++++++++++----
1 file changed, 26 insertions(+), 5 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c31e304be85..d03351f2da2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1731,7 +1731,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
- lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, properties, null, null);
+ updateLastMarkDeleteEntryToLatest(newPosition, properties);
callback.markDeleteComplete(ctx);
return;
}
@@ -1785,7 +1785,14 @@ public class ManagedCursorImpl implements ManagedCursor {
// ledger is postponed to when the counter goes to 0.
PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.incrementAndGet(this);
- lastMarkDeleteEntry = mdEntry;
+ LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
+ if (last != null && last.newPosition.compareTo(mdEntry.newPosition) > 0) {
+ // keep the current value since it's later then the mdEntry.newPosition
+ return last;
+ } else {
+ return mdEntry;
+ }
+ });
persistPositionToLedger(cursorLedger, mdEntry, new VoidCallback() {
@Override
@@ -2048,9 +2055,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
- PositionImpl finalNewMarkDeletePosition = newMarkDeletePosition;
- LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this,
- last -> new MarkDeleteEntry(finalNewMarkDeletePosition, last.properties, null, null));
+ updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null);
callback.deleteComplete(ctx);
return;
}
@@ -2082,6 +2087,22 @@ public class ManagedCursorImpl implements ManagedCursor {
}
}
+ // update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition
+ private void updateLastMarkDeleteEntryToLatest(final PositionImpl newPosition,
+ final Map<String, Long> properties) {
+ LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
+ if (last != null && last.newPosition.compareTo(newPosition) > 0) {
+ // keep current value, don't update
+ return last;
+ } else {
+ // use given properties or when missing, use the properties from the previous field value
+ Map<String, Long> propertiesToUse =
+ properties != null ? properties : (last != null ? last.properties : Collections.emptyMap());
+ return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
+ }
+ });
+ }
+
/**
* Given a list of entries, filter out the entries that have already been individually deleted.
*