You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/25 04:47:36 UTC
[pulsar] 07/11: [pulsar-client] clean up MultiTopicsConsumerImpl
reference on consumer creation failure (#11754)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit de813bf35afc7038465ecdf61921bf554293aa1b
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Aug 23 19:34:17 2021 -0700
[pulsar-client] clean up MultiTopicsConsumerImpl reference on consumer creation failure (#11754)
(cherry picked from commit f154de74830ca2eaca67d01322fcb9a557d649ce)
---
.../pulsar/client/impl/MultiTopicsConsumerImpl.java | 15 ++++++++++++---
.../apache/pulsar/client/impl/UnAckedMessageTracker.java | 1 +
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ce84376..c9e2067 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -103,7 +103,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final ConsumerStatsRecorder stats;
- private final UnAckedMessageTracker unAckedMessageTracker;
+ private UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
private volatile BatchMessageIdImpl startMessageId = null;
@@ -543,7 +543,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
@Override
public CompletableFuture<Void> closeAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
- unAckedMessageTracker.close();
+ if (unAckedMessageTracker != null) {
+ unAckedMessageTracker.close();
+ }
return CompletableFuture.completedFuture(null);
}
setState(State.Closing);
@@ -580,7 +582,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
private void cleanupMultiConsumer() {
- unAckedMessageTracker.close();
+ if (unAckedMessageTracker != null) {
+ unAckedMessageTracker.close();
+ unAckedMessageTracker = null;
+ }
+ if (partitionsAutoUpdateTimeout != null) {
+ partitionsAutoUpdateTimeout.cancel();
+ partitionsAutoUpdateTimeout = null;
+ }
client.cleanupConsumer(this);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index a244366..db616f2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -248,6 +248,7 @@ public class UnAckedMessageTracker implements Closeable {
try {
if (timeout != null && !timeout.isCancelled()) {
timeout.cancel();
+ timeout = null;
}
this.clear();
} finally {