You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/11/04 11:18:51 UTC
[pulsar] 03/14: Add close method in the class of
NegativeAcksTracker (#12469)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8fb92f163afca8c3674b2e03d4ef538f04d38a8c
Author: chenlin <15...@qq.com>
AuthorDate: Sun Oct 31 17:14:00 2021 +0800
Add close method in the class of NegativeAcksTracker (#12469)
(cherry picked from commit 3694aa1554e7f408a90eda2ba46eae17b425140a)
---
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 1 +
.../apache/pulsar/client/impl/NegativeAcksTracker.java | 16 +++++++++++++++-
.../org/apache/pulsar/client/impl/ConsumerImplTest.java | 14 ++++++++++++++
3 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 352ee86..ce4711d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -960,6 +960,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (batchReceiveTimeout != null) {
batchReceiveTimeout.cancel();
}
+ negativeAcksTracker.close();
stats.getStatTimeout().ifPresent(Timeout::cancel);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 16cfa0c..a062009 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import io.netty.util.Timeout;
import io.netty.util.Timer;
+import java.io.Closeable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
@@ -30,7 +31,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap;
-class NegativeAcksTracker {
+class NegativeAcksTracker implements Closeable {
private HashMap<MessageId, Long> nackedMessages = null;
@@ -93,4 +94,17 @@ class NegativeAcksTracker {
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
}
}
+
+ @Override
+ public synchronized void close() {
+ if (timeout != null && !timeout.isCancelled()) {
+ timeout.cancel();
+ timeout = null;
+ }
+
+ if (nackedMessages != null) {
+ nackedMessages.clear();
+ nackedMessages = null;
+ }
+ }
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 37c9e0c..8a9e665 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -187,4 +187,18 @@ public class ConsumerImplTest {
// then
Assert.assertFalse(consumer.hasPendingBatchReceive());
}
+
+ @Test
+ public void testClose() {
+ Exception checkException = null;
+ try {
+ if (consumer != null) {
+ consumer.negativeAcknowledge(new MessageIdImpl(-1, -1, -1));
+ consumer.close();
+ }
+ } catch (Exception e) {
+ checkException = e;
+ }
+ Assert.assertNull(checkException);
+ }
}