You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/19 00:44:05 UTC

[pulsar] branch branch-2.10 updated: Fixed deadlock in key-shared dispatcher (#16660)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new aeec16f65df Fixed deadlock in key-shared dispatcher (#16660)
aeec16f65df is described below

commit aeec16f65df603e06ee8a7a4f5aacba54d5096b4
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jul 18 17:43:17 2022 -0700

    Fixed deadlock in key-shared dispatcher (#16660)
---
 ...rsistentStickyKeyDispatcherMultipleConsumers.java | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 3e01531fc3e..0a402d8322a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -403,13 +403,19 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     }
 
     @Override
-    public synchronized void markDeletePositionMoveForward() {
-        if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()
-                && removeConsumersFromRecentJoinedConsumers()) {
-            // After we process acks, we need to check whether the mark-delete position was advanced and we can finally
-            // read more messages. It's safe to call readMoreEntries() multiple times.
-            readMoreEntries();
-        }
+    public void markDeletePositionMoveForward() {
+        // Execute the notification in different thread to avoid a mutex chain here
+        // from the delete operation that was completed
+        topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
+            synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
+                if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()
+                        && removeConsumersFromRecentJoinedConsumers()) {
+                    // After we process acks, we need to check whether the mark-delete position was advanced and we
+                    // can finally read more messages. It's safe to call readMoreEntries() multiple times.
+                    readMoreEntries();
+                }
+            }
+        });
     }
 
     private boolean removeConsumersFromRecentJoinedConsumers() {