You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/10/19 19:04:54 UTC

[pulsar] branch master updated: Fix race condition in updating readPosition in ManagedCursorImpl.setAcknowledgePosition (#8299)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f4a4c3  Fix race condition in updating readPosition in ManagedCursorImpl.setAcknowledgePosition (#8299)
6f4a4c3 is described below

commit 6f4a4c38bcdd08018653e6b7bceb281980574b55
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Oct 19 22:03:26 2020 +0300

    Fix race condition in updating readPosition in ManagedCursorImpl.setAcknowledgePosition (#8299)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 25 ++++++++++++----------
 1 file changed, 14 insertions(+), 11 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index df364ff..2aa02eb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1534,18 +1534,21 @@ public class ManagedCursorImpl implements ManagedCursor {
         markDeletePosition = newMarkDeletePosition;
         individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId());
 
-        if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
-            // If the position that is mark-deleted is past the read position, it
-            // means that the client has skipped some entries. We need to move
-            // read position forward
-            PositionImpl oldReadPosition = readPosition;
-            readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
-
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(),
-                        oldReadPosition, readPosition, markDeletePosition);
+        READ_POSITION_UPDATER.updateAndGet(this, currentReadPosition -> {
+            if (currentReadPosition.compareTo(markDeletePosition) <= 0) {
+                // If the position that is mark-deleted is past the read position, it
+                // means that the client has skipped some entries. We need to move
+                // read position forward
+                PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(),
+                            currentReadPosition, newReadPosition, markDeletePosition);
+                }
+                return newReadPosition;
+            } else {
+                return currentReadPosition;
             }
-        }
+        });
 
         return newMarkDeletePosition;
     }