You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/08 01:52:25 UTC

[pulsar] branch master updated: Fix 8115 Some partitions get stuck after adding additional consumers to the KEY_SHARED subscriptions (#10096)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c4f154e  Fix 8115 Some partitions get stuck after adding additional consumers to the KEY_SHARED subscriptions (#10096)
c4f154e is described below

commit c4f154e79c03cff9055aa4e2ede7748c5952f2bc
Author: baomingyu <ba...@163.com>
AuthorDate: Thu Apr 8 09:51:39 2021 +0800

    Fix 8115 Some partitions get stuck after adding additional consumers to the KEY_SHARED subscriptions (#10096)
    
    Fixes #8115
    
    Master Issue: #8115
    
    ### Motivation
    
    first point:
     Sometimes it will not success to call this method and the method readMoreEntries will not be called
    ` if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
                            readMoreEntries();
     } `
    
    second point:
      Sometimes  keyNumbers will not be decrement to zero , and broker will not be start next  loop to readMoreEntries.
    some partition topic will be stunk and stop to push message to consumer ,even though  there is permits in consumers.
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java   | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 3107f13..d27df30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -171,6 +172,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
         AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
 
+        int currentThreadKeyNumber = groupedEntries.size();
+        if (currentThreadKeyNumber == 0) {
+            currentThreadKeyNumber = -1;
+        }
         for (Map.Entry<Consumer, List<Entry>> current : groupedEntries.entrySet()) {
             Consumer consumer = current.getKey();
             List<Entry> entriesWithSameKey = current.getValue();
@@ -214,7 +219,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                         sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
                         getRedeliveryTracker()).addListener(future -> {
-                    if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
+                    if (future.isDone() && keyNumbers.decrementAndGet() == 0) {
                         readMoreEntries();
                     }
                 });
@@ -223,6 +228,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                         -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
+            } else {
+                currentThreadKeyNumber = keyNumbers.decrementAndGet();
             }
         }
 
@@ -260,6 +267,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             // readMoreEntries should run regardless whether or not stuck is caused by
             // stuckConsumers for avoid stopping dispatch.
             readMoreEntries();
+        }  else if (currentThreadKeyNumber == 0) {
+            topic.getBrokerService().executor().schedule(() -> {
+                synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
+                    readMoreEntries();
+                }
+            }, 100, TimeUnit.MILLISECONDS);
         }
     }