You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/10/19 19:04:54 UTC
[pulsar] branch master updated: Fix race condition in updating
readPosition in ManagedCursorImpl.setAcknowledgePosition (#8299)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6f4a4c3 Fix race condition in updating readPosition in ManagedCursorImpl.setAcknowledgePosition (#8299)
6f4a4c3 is described below
commit 6f4a4c38bcdd08018653e6b7bceb281980574b55
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Oct 19 22:03:26 2020 +0300
Fix race condition in updating readPosition in ManagedCursorImpl.setAcknowledgePosition (#8299)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 25 ++++++++++++----------
1 file changed, 14 insertions(+), 11 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index df364ff..2aa02eb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1534,18 +1534,21 @@ public class ManagedCursorImpl implements ManagedCursor {
markDeletePosition = newMarkDeletePosition;
individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId());
- if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
- // If the position that is mark-deleted is past the read position, it
- // means that the client has skipped some entries. We need to move
- // read position forward
- PositionImpl oldReadPosition = readPosition;
- readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(),
- oldReadPosition, readPosition, markDeletePosition);
+ READ_POSITION_UPDATER.updateAndGet(this, currentReadPosition -> {
+ if (currentReadPosition.compareTo(markDeletePosition) <= 0) {
+ // If the position that is mark-deleted is past the read position, it
+ // means that the client has skipped some entries. We need to move
+ // read position forward
+ PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(),
+ currentReadPosition, newReadPosition, markDeletePosition);
+ }
+ return newReadPosition;
+ } else {
+ return currentReadPosition;
}
- }
+ });
return newMarkDeletePosition;
}