You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/06 07:05:06 UTC

[pulsar] branch branch-2.9 updated: [ML] Fix race condition in updating lastMarkDeleteEntry field (#15031)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new adb49581213 [ML] Fix race condition in updating lastMarkDeleteEntry field (#15031)
adb49581213 is described below

commit adb4958121349aecc3e4e02e9fe6404debe2b4cb
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Apr 5 22:44:25 2022 +0300

    [ML] Fix race condition in updating lastMarkDeleteEntry field (#15031)
    
    - missed updates can lead to the subscription and consuming getting stuck
    
    (cherry picked from commit ad2f397fb9bb1d6f90a980e68f0161ed32900309)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 31 ++++++++++++++++++----
 1 file changed, 26 insertions(+), 5 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f882444a4a8..b3ba61810c2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1732,7 +1732,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // Apply rate limiting to mark-delete operations
         if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
             isDirty = true;
-            lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, properties, null, null);
+            updateLastMarkDeleteEntryToLatest(newPosition, properties);
             callback.markDeleteComplete(ctx);
             return;
         }
@@ -1786,7 +1786,14 @@ public class ManagedCursorImpl implements ManagedCursor {
         // ledger is postponed to when the counter goes to 0.
         PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.incrementAndGet(this);
 
-        lastMarkDeleteEntry = mdEntry;
+        LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
+            if (last != null && last.newPosition.compareTo(mdEntry.newPosition) > 0) {
+                // keep the current value since it's later then the mdEntry.newPosition
+                return last;
+            } else {
+                return mdEntry;
+            }
+        });
 
         persistPositionToLedger(cursorLedger, mdEntry, new VoidCallback() {
             @Override
@@ -2049,9 +2056,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // Apply rate limiting to mark-delete operations
         if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
             isDirty = true;
-            PositionImpl finalNewMarkDeletePosition = newMarkDeletePosition;
-            LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this,
-                    last -> new MarkDeleteEntry(finalNewMarkDeletePosition, last.properties, null, null));
+            updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null);
             callback.deleteComplete(ctx);
             return;
         }
@@ -2083,6 +2088,22 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
+    // update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition
+    private void updateLastMarkDeleteEntryToLatest(final PositionImpl newPosition,
+                                                   final Map<String, Long> properties) {
+        LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
+            if (last != null && last.newPosition.compareTo(newPosition) > 0) {
+                // keep current value, don't update
+                return last;
+            } else {
+                // use given properties or when missing, use the properties from the previous field value
+                Map<String, Long> propertiesToUse =
+                        properties != null ? properties : (last != null ? last.properties : Collections.emptyMap());
+                return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
+            }
+        });
+    }
+
     /**
      * Given a list of entries, filter out the entries that have already been individually deleted.
      *