You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/25 04:47:36 UTC

[pulsar] 07/11: [pulsar-client] clean up MultiTopicsConsumerImpl reference on consumer creation failure (#11754)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit de813bf35afc7038465ecdf61921bf554293aa1b
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Aug 23 19:34:17 2021 -0700

    [pulsar-client] clean up MultiTopicsConsumerImpl reference on consumer creation failure (#11754)
    
    (cherry picked from commit f154de74830ca2eaca67d01322fcb9a557d649ce)
---
 .../pulsar/client/impl/MultiTopicsConsumerImpl.java       | 15 ++++++++++++---
 .../apache/pulsar/client/impl/UnAckedMessageTracker.java  |  1 +
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ce84376..c9e2067 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -103,7 +103,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     private final ConsumerStatsRecorder stats;
-    private final UnAckedMessageTracker unAckedMessageTracker;
+    private UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
 
     private volatile BatchMessageIdImpl startMessageId = null;
@@ -543,7 +543,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     @Override
     public CompletableFuture<Void> closeAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
-            unAckedMessageTracker.close();
+            if (unAckedMessageTracker != null) {
+                unAckedMessageTracker.close();
+            }
             return CompletableFuture.completedFuture(null);
         }
         setState(State.Closing);
@@ -580,7 +582,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     private void cleanupMultiConsumer() {
-        unAckedMessageTracker.close();
+        if (unAckedMessageTracker != null) {
+            unAckedMessageTracker.close();
+            unAckedMessageTracker = null;
+        }
+        if (partitionsAutoUpdateTimeout != null) {
+            partitionsAutoUpdateTimeout.cancel();
+            partitionsAutoUpdateTimeout = null;
+        }
         client.cleanupConsumer(this);
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index a244366..db616f2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -248,6 +248,7 @@ public class UnAckedMessageTracker implements Closeable {
         try {
             if (timeout != null && !timeout.isCancelled()) {
                 timeout.cancel();
+                timeout = null;
             }
             this.clear();
         } finally {