You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/31 14:05:39 UTC

[incubator-pulsar] branch master updated: Fix race condition in skipEntries that moves readPosition and markDeletePosition incorrectly (#1478)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2eda972  Fix race condition in skipEntries that moves readPosition and markDeletePosition incorrectly (#1478)
2eda972 is described below

commit 2eda9725ecb0ec494512bf3725839f0d24bf1d0b
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat Mar 31 07:05:36 2018 -0700

    Fix race condition in skipEntries that moves readPosition and markDeletePosition incorrectly (#1478)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 25 +++++++++++-----------
 1 file changed, 13 insertions(+), 12 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 9b3d9cd..29fe0a7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1245,18 +1245,6 @@ public class ManagedCursorImpl implements ManagedCursor {
             throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
         }
 
-        if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
-            // If the position that is mark-deleted is past the read position, it
-            // means that the client has skipped some entries. We need to move
-            // read position forward
-            PositionImpl oldReadPosition = readPosition;
-            readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
-
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Moved read position from: {} to: {}", ledger.getName(), oldReadPosition, readPosition);
-            }
-        }
-
         PositionImpl oldMarkDeletePosition = markDeletePosition;
 
         if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) {
@@ -1285,6 +1273,19 @@ public class ManagedCursorImpl implements ManagedCursor {
         // markDelete-position and clear out deletedMsgSet
         markDeletePosition = PositionImpl.get(newMarkDeletePosition);
         individualDeletedMessages.remove(Range.atMost(markDeletePosition));
+        
+        if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
+            // If the position that is mark-deleted is past the read position, it
+            // means that the client has skipped some entries. We need to move
+            // read position forward
+            PositionImpl oldReadPosition = readPosition;
+            readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(),
+                        oldReadPosition, readPosition, markDeletePosition);
+            }
+        }
 
         return newMarkDeletePosition;
     }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.