You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/12 06:35:44 UTC

[pulsar] branch branch-2.6 updated: Fix race condition when call acknowledgementWasProcessed() (#8499)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 7cf2d12  Fix race condition when call acknowledgementWasProcessed() (#8499)
7cf2d12 is described below

commit 7cf2d12a45b20ce9bd505dcfb45165fe39af9e40
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Nov 12 14:31:15 2020 +0800

    Fix race condition when call acknowledgementWasProcessed() (#8499)
    
    1. Fix race condition when calling `acknowledgementWasProcessed()`. Currently, when the broker received a message acknowledge request, the broker will call ManagedCursor to delete the position by using `cursor.asyncDelete()`. We should call the acknowledgementWasProcessed() in the delete position callback. Otherwise, when the dispatcher for the Key_Shared subscription will get an unchanged mark delete position so that in some case the consumer can't be removed from the `recentJoinedCo [...]
    
    2. Currently, the consumer removed from the `recentJoinedConsumers` until the mark delete position is greater than the read position that the consumer joined and there are new messages need to deliver to this consumer. But in some case, there are no new messages need to deliver to this consumer, so this consumer can't be moved from the `recentJoinedConsumers`. This will also lead to consumption stuck.
    
    (cherry picked from commit 8df73645308c9f5599836dcfa355097fd0f9c216)
---
 .../apache/pulsar/broker/service/Dispatcher.java   |  2 +-
 .../persistent/PersistentMessageExpiryMonitor.java |  2 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 39 ++++++++++++++--------
 .../service/persistent/PersistentSubscription.java | 28 ++++++++++------
 4 files changed, 46 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index e141cf7..2bd053f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -112,7 +112,7 @@ public interface Dispatcher {
         //No-op
     }
 
-    default void acknowledgementWasProcessed() {
+    default void markDeletePositionMoveForward() {
         // No-op
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 02d5a52..0380c32 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -107,7 +107,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
             updateRates();
             // If the subscription is a Key_Shared subscription, we should to trigger message dispatch.
             if (subscription != null && subscription.getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared) {
-                subscription.getDispatcher().acknowledgementWasProcessed();
+                subscription.getDispatcher().markDeletePositionMoveForward();
             }
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Mark deleted {} messages", topicName, subName, numMessagesExpired);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 6180e77..1f3d9f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -127,6 +128,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             if (consumerList.size() == 1) {
                 recentlyJoinedConsumers.clear();
             }
+            if (removeConsumersFromRecentJoinedConsumers() || messagesToRedeliver.size() > 0) {
+                readMoreEntries();
+            }
         }
     }
 
@@ -260,12 +264,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             nextStuckConsumers.add(consumer);
             return 0;
         }
-
         if (recentlyJoinedConsumers == null) {
             return maxMessages;
         }
-
+        removeConsumersFromRecentJoinedConsumers();
         PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
+        // At this point, all the old messages were already consumed and this consumer
+        // is now ready to receive any message
         if (maxReadPosition == null) {
             // stop to dispatch by stuckConsumers
             if (stuckConsumers.contains(consumer)) {
@@ -278,15 +283,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             return maxMessages;
         }
 
-        PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition();
-
-        if (maxReadPosition.compareTo(markDeletePosition.getNext()) <= 0) {
-            // At this point, all the old messages were already consumed and this consumer
-            // is now ready to receive any message
-            recentlyJoinedConsumers.remove(consumer);
-            return maxMessages;
-        }
-
         // If the read type is Replay, we should avoid send messages that hold by other consumer to the new consumers,
         // For example, we have 10 messages [0,1,2,3,4,5,6,7,8,9]
         // If the consumer0 get message 0 and 1, and does not acked message 0, then consumer1 joined,
@@ -317,14 +313,31 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     }
 
     @Override
-    public synchronized void acknowledgementWasProcessed() {
-        if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()) {
+    public synchronized void markDeletePositionMoveForward() {
+        if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()
+                && removeConsumersFromRecentJoinedConsumers()) {
             // After we process acks, we need to check whether the mark-delete position was advanced and we can finally
             // read more messages. It's safe to call readMoreEntries() multiple times.
             readMoreEntries();
         }
     }
 
+    private boolean removeConsumersFromRecentJoinedConsumers() {
+        Iterator<Map.Entry<Consumer, PositionImpl>> itr = recentlyJoinedConsumers.entrySet().iterator();
+        PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition();
+        boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
+        while (itr.hasNext()) {
+            Map.Entry<Consumer, PositionImpl> entry = itr.next();
+            if (entry.getValue().compareTo(mdp) <= 0) {
+                itr.remove();
+                hasConsumerRemovedFromTheRecentJoinedConsumers = true;
+            } else {
+                break;
+            }
+        }
+        return hasConsumerRemovedFromTheRecentJoinedConsumers;
+    }
+
     protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
         if (isDispatcherStuckOnReplays) {
             // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index bc38db3..2d6533d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -326,7 +326,7 @@ public class PersistentSubscription implements Subscription {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, position);
             }
-            cursor.asyncMarkDelete(position, mergeCursorProperties(properties), markDeleteCallback, position);
+            cursor.asyncMarkDelete(position, mergeCursorProperties(properties), markDeleteCallback, previousMarkDeletePosition);
 
         } else {
             if (log.isDebugEnabled()) {
@@ -356,9 +356,9 @@ public class PersistentSubscription implements Subscription {
                         return true;
                     }).collect(Collectors.toList());
                 }
-                cursor.asyncDelete(positionsSafeToAck, deleteCallback, positionsSafeToAck);
+                cursor.asyncDelete(positionsSafeToAck, deleteCallback, previousMarkDeletePosition);
             } else {
-                cursor.asyncDelete(positions, deleteCallback, positions);
+                cursor.asyncDelete(positions, deleteCallback, previousMarkDeletePosition);
             }
 
             if(dispatcher != null){
@@ -385,11 +385,6 @@ public class PersistentSubscription implements Subscription {
                 dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
             }
         }
-
-        // Signal the dispatchers to give chance to take extra actions
-        if(dispatcher != null){
-            dispatcher.acknowledgementWasProcessed();
-        }
     }
 
     /**
@@ -499,10 +494,13 @@ public class PersistentSubscription implements Subscription {
     private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
         @Override
         public void markDeleteComplete(Object ctx) {
-            PositionImpl pos = (PositionImpl) ctx;
+            PositionImpl oldMD = (PositionImpl) ctx;
+            PositionImpl newMD = (PositionImpl) cursor.getMarkDeletedPosition();
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Mark deleted messages until position {}", topicName, subName, pos);
+                log.debug("[{}][{}] Mark deleted messages to position {} from position {}", topicName, subName, newMD, oldMD);
             }
+            // Signal the dispatchers to give chance to take extra actions
+            notifyTheMarkDeletePositionMoveForwardIfNeeded(oldMD);
         }
 
         @Override
@@ -520,6 +518,8 @@ public class PersistentSubscription implements Subscription {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Deleted message at {}", topicName, subName, position);
             }
+            // Signal the dispatchers to give chance to take extra actions
+            notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) position);
         }
 
         @Override
@@ -528,6 +528,14 @@ public class PersistentSubscription implements Subscription {
         }
     };
 
+    private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position oldPosition) {
+        PositionImpl oldMD = (PositionImpl) oldPosition;
+        PositionImpl newMD = (PositionImpl) cursor.getMarkDeletedPosition();
+        if(dispatcher != null && newMD.compareTo(oldMD) > 0){
+            dispatcher.markDeletePositionMoveForward();
+        }
+    }
+
     @Override
     public String toString() {
         return fullName;