You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/01 11:42:58 UTC

[GitHub] [pulsar] baomingyu commented on a change in pull request #10285: fix issue 10284 Occasional consumer stucked when restart consumer whit key_shared subscription type

baomingyu commented on a change in pull request #10285:
URL: https://github.com/apache/pulsar/pull/10285#discussion_r643025169



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -322,17 +325,38 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
                 maxReadPosition = minReadPositionForRecentJoinedConsumer;
             }
         }
+
         // Here, the consumer is one that has recently joined, so we can only send messages that were
         // published before it has joined.
+        int messageNum = 0;
         for (int i = 0; i < maxMessages; i++) {
             if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
                 // We have already crossed the divider line. All messages in the list are now
                 // newer than what we can currently dispatch to this consumer
-                return i;
+                messageNum  = i;
+                break;
             }
         }
 
-        return maxMessages;
+        if (messageNum == 0) {
+            messageNum = maxMessages;
+            CopyOnWriteArrayList<Consumer> consumers = this.getConsumers();
+            for (Consumer consumer1 : consumers) {
+                if (isApendMessage(consumer1, entries.get(0).getLedgerId(),
+                        entries.get(0).getEntryId())) {
+                    messageNum = 0;
+                    break;
+                }
+            }
+        }
+        return messageNum;
+    }
+
+    private boolean isApendMessage(Consumer consuemr, long ledgerId, long entryId) {
+        if (consuemr != null && consuemr.getPendingAcks().get(ledgerId, entryId) != null) {

Review comment:
       > @baomingyu Seems the description of this PR is not proper to explain how to fix this problem, could you please provide more context about why the `getRestrictedMaxEntriesForConsumer` will always return 0 after dispatch messages to a consumer that without any flow permits?
   
   This problem is mainly caused by the time difference between creating a consumer and having permits for this consumer, which are not processed at the same time. 
   For example, during dispatching messages at first time, there are consumer A and consumer B in the consumer list, consumer A's permit is 1000, and consumer B has not received the flow command yet, and the permit is 0. At this time, use key_shared mode to send messages. If these messages are both It is assigned to consumer B by key. At this time, the message will not be actually pushed to consumer B. In the next round of push, consumer B has received the flow command and the permit has changed to 1000. If you don’t modify  code , just compare the position where has been pushed last time. It will not push messages to consumer B actually. The phenomenon is that the broker has a message, but it is not pushed to the consumer.
   
   The method of modification is:
   When the getRestrictedMaxEntriesForConsumer method goes to the back messageNum == 0, it need to be  judged that the current message has been successfully pushed to the consumer. If it has not been pushed to any consumer, it needs to be pushed again.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org