You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/04 10:05:00 UTC
[pulsar] 02/03: fix PatternTopicsChangedListener blocked when topic removed (#16842)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ccb6140190203e51e68e17a9bfe3a758b2fda883
Author: WangJialing <65...@users.noreply.github.com>
AuthorDate: Fri Jul 29 22:30:23 2022 +0800
fix PatternTopicsChangedListener blocked when topic removed (#16842)
(cherry picked from commit 2d6b534ef72824ffb5b2f76568ca8346bbc058c6)
---
.../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6bfbb71c72c..aad4c622734 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1109,7 +1109,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
});
removeTopic(topicName);
- ((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);
+ if (unAckedMessageTracker instanceof UnAckedTopicMessageTracker) {
+ ((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);
+ }
unsubscribeFuture.complete(null);
log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, allTopicPartitionsNumber: {}",
@@ -1157,7 +1159,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
});
removeTopic(topicName);
- ((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);
+ if (unAckedMessageTracker instanceof UnAckedTopicMessageTracker) {
+ ((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);
+ }
unsubscribeFuture.complete(null);
log.info("[{}] [{}] [{}] Removed Topics Consumer, allTopicPartitionsNumber: {}",