You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/14 10:47:20 UTC
[pulsar] branch branch-2.7 updated: [ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 29aac5ade46 [ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)
29aac5ade46 is described below
commit 29aac5ade4603dbf2ca6fbe2f8f70437e230744f
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Thu Apr 14 13:39:17 2022 +0300
[ML] Follow up on race condition fixes in ManagedCursorImpl #15031 (#15067)
- follow up on #15031
* [ML] Fix race in persisting mark delete position
* [ML] Resetting should reset lastMarkDeleteEntry
* [ML] Reset fields in initializeCursorPosition method
(cherry picked from commit a19a30a5e89bfd2c2da5460db6120c8e0a48d8f7)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 48 +++++++++++++---------
1 file changed, 29 insertions(+), 19 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 87fe0b94c26..b932ebb78eb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -207,6 +207,31 @@ public class ManagedCursorImpl implements ManagedCursor {
this.callback = callback;
this.ctx = ctx;
}
+
+ public void triggerComplete() {
+ // Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
+ // will ensure that no race condition will happen between the next mark-delete and the switching
+ // operation.
+ if (callbackGroup != null) {
+ // Trigger the callback for every request in the group
+ for (MarkDeleteEntry e : callbackGroup) {
+ e.callback.markDeleteComplete(e.ctx);
+ }
+ } else if (callback != null) {
+ // Only trigger the callback for the current request
+ callback.markDeleteComplete(ctx);
+ }
+ }
+
+ public void triggerFailed(ManagedLedgerException exception) {
+ if (callbackGroup != null) {
+ for (MarkDeleteEntry e : callbackGroup) {
+ e.callback.markDeleteFailed(exception, e.ctx);
+ }
+ } else if (callback != null) {
+ callback.markDeleteFailed(exception, ctx);
+ }
+ }
}
protected final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
@@ -1058,6 +1083,7 @@ public class ManagedCursorImpl implements ManagedCursor {
};
+ lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
internalAsyncMarkDelete(newPosition, Collections.emptyMap(), new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
@@ -1503,6 +1529,7 @@ public class ManagedCursorImpl implements ManagedCursor {
void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
markDeletePosition = lastPositionCounter.getLeft();
+ lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is 0, to ensure the initial backlog count is 0.
@@ -1741,18 +1768,7 @@ public class ManagedCursorImpl implements ManagedCursor {
decrementPendingMarkDeleteCount();
- // Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
- // will ensure that no race condition will happen between the next mark-delete and the switching
- // operation.
- if (mdEntry.callbackGroup != null) {
- // Trigger the callback for every request in the group
- for (MarkDeleteEntry e : mdEntry.callbackGroup) {
- e.callback.markDeleteComplete(e.ctx);
- }
- } else {
- // Only trigger the callback for the current request
- mdEntry.callback.markDeleteComplete(mdEntry.ctx);
- }
+ mdEntry.triggerComplete();
}
@Override
@@ -1767,13 +1783,7 @@ public class ManagedCursorImpl implements ManagedCursor {
decrementPendingMarkDeleteCount();
- if (mdEntry.callbackGroup != null) {
- for (MarkDeleteEntry e : mdEntry.callbackGroup) {
- e.callback.markDeleteFailed(exception, e.ctx);
- }
- } else {
- mdEntry.callback.markDeleteFailed(exception, mdEntry.ctx);
- }
+ mdEntry.triggerFailed(exception);
}
});
}