You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/14 10:47:20 UTC

[pulsar] branch branch-2.7 updated: [ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 29aac5ade46 [ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)
29aac5ade46 is described below

commit 29aac5ade4603dbf2ca6fbe2f8f70437e230744f
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Thu Apr 14 13:39:17 2022 +0300

    [ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)
    
    - follow up on #15031
    * [ML] Fix race in persisting mark delete position
    * [ML] Resetting should reset lastMarkDeleteEntry
    * [ML] Reset fields in initializeCursorPosition method
    
    (cherry picked from commit a19a30a5e89bfd2c2da5460db6120c8e0a48d8f7)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 48 +++++++++++++---------
 1 file changed, 29 insertions(+), 19 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 87fe0b94c26..b932ebb78eb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -207,6 +207,31 @@ public class ManagedCursorImpl implements ManagedCursor {
             this.callback = callback;
             this.ctx = ctx;
         }
+
+        public void triggerComplete() {
+            // Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
+            // will ensure that no race condition will happen between the next mark-delete and the switching
+            // operation.
+            if (callbackGroup != null) {
+                // Trigger the callback for every request in the group
+                for (MarkDeleteEntry e : callbackGroup) {
+                    e.callback.markDeleteComplete(e.ctx);
+                }
+            } else if (callback != null) {
+                // Only trigger the callback for the current request
+                callback.markDeleteComplete(ctx);
+            }
+        }
+
+        public void triggerFailed(ManagedLedgerException exception) {
+            if (callbackGroup != null) {
+                for (MarkDeleteEntry e : callbackGroup) {
+                    e.callback.markDeleteFailed(exception, e.ctx);
+                }
+            } else if (callback != null) {
+                callback.markDeleteFailed(exception, ctx);
+            }
+        }
     }
 
     protected final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
@@ -1058,6 +1083,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         };
 
+        lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
         internalAsyncMarkDelete(newPosition, Collections.emptyMap(), new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
@@ -1503,6 +1529,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
         readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
         markDeletePosition = lastPositionCounter.getLeft();
+        lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
 
         // Initialize the counter such that the difference between the messages written on the ML and the
         // messagesConsumed is 0, to ensure the initial backlog count is 0.
@@ -1741,18 +1768,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
                 decrementPendingMarkDeleteCount();
 
-                // Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
-                // will ensure that no race condition will happen between the next mark-delete and the switching
-                // operation.
-                if (mdEntry.callbackGroup != null) {
-                    // Trigger the callback for every request in the group
-                    for (MarkDeleteEntry e : mdEntry.callbackGroup) {
-                        e.callback.markDeleteComplete(e.ctx);
-                    }
-                } else {
-                    // Only trigger the callback for the current request
-                    mdEntry.callback.markDeleteComplete(mdEntry.ctx);
-                }
+                mdEntry.triggerComplete();
             }
 
             @Override
@@ -1767,13 +1783,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
                 decrementPendingMarkDeleteCount();
 
-                if (mdEntry.callbackGroup != null) {
-                    for (MarkDeleteEntry e : mdEntry.callbackGroup) {
-                        e.callback.markDeleteFailed(exception, e.ctx);
-                    }
-                } else {
-                    mdEntry.callback.markDeleteFailed(exception, mdEntry.ctx);
-                }
+                mdEntry.triggerFailed(exception);
             }
         });
     }