You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/06/14 21:56:10 UTC

[pulsar] 03/03: Fix issue where Key_Shared consumers could get stuck (#10920)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8065d6c4015a34fdc48017e15866b034f56849e0
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Tue Jun 15 06:50:59 2021 +0900

    Fix issue where Key_Shared consumers could get stuck (#10920)
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java           | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 5145c3b..d9a56a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -121,8 +121,14 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
-        super.removeConsumer(consumer);
+        // The consumer must be removed from the selector before calling the superclass removeConsumer method.
+        // In the superclass removeConsumer method, the pending acks that the consumer has are added to
+        // messagesToRedeliver. If the consumer has not been removed from the selector at this point,
+        // the broker will try to redeliver the messages to the consumer that has already been closed.
+        // As a result, the messages are not redelivered to any consumer, and the mark-delete position does not move,
+        // eventually causing all consumers to get stuck.
         selector.removeConsumer(consumer);
+        super.removeConsumer(consumer);
         if (recentlyJoinedConsumers != null) {
             recentlyJoinedConsumers.remove(consumer);
             if (consumerList.size() == 1) {