You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/01 03:23:00 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #10285: fix issue 10284 Occasional consumer stucked when restart consumer whit key_shared subscription type

codelipenghui commented on a change in pull request #10285:
URL: https://github.com/apache/pulsar/pull/10285#discussion_r642755201



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -322,17 +325,38 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
                 maxReadPosition = minReadPositionForRecentJoinedConsumer;
             }
         }
+
         // Here, the consumer is one that has recently joined, so we can only send messages that were
         // published before it has joined.
+        int messageNum = 0;
         for (int i = 0; i < maxMessages; i++) {
             if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
                 // We have already crossed the divider line. All messages in the list are now
                 // newer than what we can currently dispatch to this consumer
-                return i;
+                messageNum  = i;
+                break;
             }
         }
 
-        return maxMessages;
+        if (messageNum == 0) {
+            messageNum = maxMessages;
+            CopyOnWriteArrayList<Consumer> consumers = this.getConsumers();
+            for (Consumer consumer1 : consumers) {
+                if (isApendMessage(consumer1, entries.get(0).getLedgerId(),
+                        entries.get(0).getEntryId())) {
+                    messageNum = 0;
+                    break;
+                }
+            }
+        }
+        return messageNum;
+    }
+
+    private boolean isApendMessage(Consumer consuemr, long ledgerId, long entryId) {
+        if (consuemr != null && consuemr.getPendingAcks().get(ledgerId, entryId) != null) {

Review comment:
       ```suggestion
           if (consumer != null && consuemr.getPendingAcks().get(ledgerId, entryId) != null) {
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -322,17 +325,38 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
                 maxReadPosition = minReadPositionForRecentJoinedConsumer;
             }
         }
+
         // Here, the consumer is one that has recently joined, so we can only send messages that were
         // published before it has joined.
+        int messageNum = 0;
         for (int i = 0; i < maxMessages; i++) {
             if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
                 // We have already crossed the divider line. All messages in the list are now
                 // newer than what we can currently dispatch to this consumer
-                return i;
+                messageNum  = i;
+                break;
             }
         }
 
-        return maxMessages;
+        if (messageNum == 0) {
+            messageNum = maxMessages;
+            CopyOnWriteArrayList<Consumer> consumers = this.getConsumers();
+            for (Consumer consumer1 : consumers) {
+                if (isApendMessage(consumer1, entries.get(0).getLedgerId(),
+                        entries.get(0).getEntryId())) {
+                    messageNum = 0;
+                    break;
+                }
+            }
+        }
+        return messageNum;
+    }
+
+    private boolean isApendMessage(Consumer consuemr, long ledgerId, long entryId) {

Review comment:
       ```suggestion
       private boolean isPendingAckMessage(Consumer consuemr, long ledgerId, long entryId) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org