You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/06/14 21:56:10 UTC
[pulsar] 03/03: Fix issue where Key_Shared consumers could get
stuck (#10920)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8065d6c4015a34fdc48017e15866b034f56849e0
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Tue Jun 15 06:50:59 2021 +0900
Fix issue where Key_Shared consumers could get stuck (#10920)
---
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 5145c3b..d9a56a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -121,8 +121,14 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
@Override
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
- super.removeConsumer(consumer);
+ // The consumer must be removed from the selector before calling the superclass removeConsumer method.
+ // In the superclass removeConsumer method, the pending acks that the consumer has are added to
+ // messagesToRedeliver. If the consumer has not been removed from the selector at this point,
+ // the broker will try to redeliver the messages to the consumer that has already been closed.
+ // As a result, the messages are not redelivered to any consumer, and the mark-delete position does not move,
+ // eventually causing all consumers to get stuck.
selector.removeConsumer(consumer);
+ super.removeConsumer(consumer);
if (recentlyJoinedConsumers != null) {
recentlyJoinedConsumers.remove(consumer);
if (consumerList.size() == 1) {