You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/14 10:20:53 UTC

[pulsar] branch branch-2.9 updated: [ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 6c2cc9bd3b8 [ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)
6c2cc9bd3b8 is described below

commit 6c2cc9bd3b841bd970ebf54b820469019e99997a
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Apr 14 13:04:25 2022 +0300

    [ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)
    
    - follow up on #15031
    * [ML] Fix race in persisting mark delete position
    * [ML] Resetting should reset lastMarkDeleteEntry
    * [ML] Reset fields in initializeCursorPosition method
    
    (cherry picked from commit a19a30a5e89bfd2c2da5460db6120c8e0a48d8f7)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 100 +++++++++++++++------
 1 file changed, 75 insertions(+), 25 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b3ba61810c2..adb29bb37bb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -118,6 +118,11 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     // this position is have persistent mark delete position
     protected volatile PositionImpl persistentMarkDeletePosition;
+    protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl>
+            INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class,
+                    "inProgressMarkDeletePersistPosition");
+    protected volatile PositionImpl inProgressMarkDeletePersistPosition;
 
     protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl> READ_POSITION_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class, "readPosition");
@@ -214,6 +219,31 @@ public class ManagedCursorImpl implements ManagedCursor {
             this.callback = callback;
             this.ctx = ctx;
         }
+
+        public void triggerComplete() {
+            // Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
+            // will ensure that no race condition will happen between the next mark-delete and the switching
+            // operation.
+            if (callbackGroup != null) {
+                // Trigger the callback for every request in the group
+                for (MarkDeleteEntry e : callbackGroup) {
+                    e.callback.markDeleteComplete(e.ctx);
+                }
+            } else if (callback != null) {
+                // Only trigger the callback for the current request
+                callback.markDeleteComplete(ctx);
+            }
+        }
+
+        public void triggerFailed(ManagedLedgerException exception) {
+            if (callbackGroup != null) {
+                for (MarkDeleteEntry e : callbackGroup) {
+                    e.callback.markDeleteFailed(exception, e.ctx);
+                }
+            } else if (callback != null) {
+                callback.markDeleteFailed(exception, ctx);
+            }
+        }
     }
 
     protected final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
@@ -540,6 +570,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
         markDeletePosition = position;
         persistentMarkDeletePosition = position;
+        inProgressMarkDeletePersistPosition = null;
         readPosition = ledger.getNextValidPosition(position);
         lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
         // assign cursor-ledger so, it can be deleted when new ledger will be switched
@@ -1121,7 +1152,11 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         };
 
-        internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() {
+        persistentMarkDeletePosition = null;
+        inProgressMarkDeletePersistPosition = null;
+        lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
+        internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
+                new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
                 finalCallback.operationComplete();
@@ -1567,6 +1602,9 @@ public class ManagedCursorImpl implements ManagedCursor {
     void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
         readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
         markDeletePosition = lastPositionCounter.getLeft();
+        lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
+        persistentMarkDeletePosition = null;
+        inProgressMarkDeletePersistPosition = null;
 
         // Initialize the counter such that the difference between the messages written on the ML and the
         // messagesConsumed is 0, to ensure the initial backlog count is 0.
@@ -1781,6 +1819,34 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     void internalMarkDelete(final MarkDeleteEntry mdEntry) {
+        if (persistentMarkDeletePosition != null
+                && mdEntry.newPosition.compareTo(persistentMarkDeletePosition) < 0) {
+            if (log.isInfoEnabled()) {
+                log.info("Skipping updating mark delete position to {}. The persisted mark delete position {} "
+                        + "is later.", mdEntry.newPosition, persistentMarkDeletePosition);
+            }
+            mdEntry.triggerComplete();
+            return;
+        }
+
+        PositionImpl inProgressLatest = INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER.updateAndGet(this, current -> {
+            if (current != null && current.compareTo(mdEntry.newPosition) > 0) {
+                return current;
+            } else {
+                return mdEntry.newPosition;
+            }
+        });
+
+        // if there's a newer or equal mark delete update in progress, skip it.
+        if (inProgressLatest != mdEntry.newPosition) {
+            if (log.isInfoEnabled()) {
+                log.info("Skipping updating mark delete position to {}. The mark delete position update "
+                        + "in progress {} is later.", mdEntry.newPosition, inProgressLatest);
+            }
+            mdEntry.triggerComplete();
+            return;
+        }
+
         // The counter is used to mark all the pending mark-delete request that were submitted to BK and that are not
         // yet finished. While we have outstanding requests we cannot close the current ledger, so the switch to new
         // ledger is postponed to when the counter goes to 0.
@@ -1803,6 +1869,9 @@ public class ManagedCursorImpl implements ManagedCursor {
                             mdEntry.newPosition);
                 }
 
+                INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER.compareAndSet(ManagedCursorImpl.this,
+                        mdEntry.newPosition, null);
+
                 // Remove from the individual deleted messages all the entries before the new mark delete
                 // point.
                 lock.writeLock().lock();
@@ -1814,11 +1883,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                         subMap.values().forEach(BitSetRecyclable::recycle);
                         subMap.clear();
                     }
-                    if (persistentMarkDeletePosition == null
-                            || mdEntry.newPosition.compareTo(persistentMarkDeletePosition) > 0) {
-                        persistentMarkDeletePosition = mdEntry.newPosition;
-                    }
-
+                    persistentMarkDeletePosition = mdEntry.newPosition;
                 } finally {
                     lock.writeLock().unlock();
                 }
@@ -1827,22 +1892,13 @@ public class ManagedCursorImpl implements ManagedCursor {
 
                 decrementPendingMarkDeleteCount();
 
-                // Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
-                // will ensure that no race condition will happen between the next mark-delete and the switching
-                // operation.
-                if (mdEntry.callbackGroup != null) {
-                    // Trigger the callback for every request in the group
-                    for (MarkDeleteEntry e : mdEntry.callbackGroup) {
-                        e.callback.markDeleteComplete(e.ctx);
-                    }
-                } else {
-                    // Only trigger the callback for the current request
-                    mdEntry.callback.markDeleteComplete(mdEntry.ctx);
-                }
+                mdEntry.triggerComplete();
             }
 
             @Override
             public void operationFailed(ManagedLedgerException exception) {
+                INPROGRESS_MARKDELETE_PERSIST_POSITION_UPDATER.compareAndSet(ManagedCursorImpl.this,
+                        mdEntry.newPosition, null);
                 isDirty = true;
                 log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(),
                         ManagedCursorImpl.this, mdEntry.newPosition);
@@ -1853,13 +1909,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
                 decrementPendingMarkDeleteCount();
 
-                if (mdEntry.callbackGroup != null) {
-                    for (MarkDeleteEntry e : mdEntry.callbackGroup) {
-                        e.callback.markDeleteFailed(exception, e.ctx);
-                    }
-                } else {
-                    mdEntry.callback.markDeleteFailed(exception, mdEntry.ctx);
-                }
+                mdEntry.triggerFailed(exception);
             }
         });
     }