You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/11/04 11:18:51 UTC

[pulsar] 03/14: Add close method in the class of NegativeAcksTracker (#12469)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8fb92f163afca8c3674b2e03d4ef538f04d38a8c
Author: chenlin <15...@qq.com>
AuthorDate: Sun Oct 31 17:14:00 2021 +0800

    Add close method in the class of NegativeAcksTracker (#12469)
    
    (cherry picked from commit 3694aa1554e7f408a90eda2ba46eae17b425140a)
---
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java |  1 +
 .../apache/pulsar/client/impl/NegativeAcksTracker.java   | 16 +++++++++++++++-
 .../org/apache/pulsar/client/impl/ConsumerImplTest.java  | 14 ++++++++++++++
 3 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 352ee86..ce4711d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -960,6 +960,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         if (batchReceiveTimeout != null) {
             batchReceiveTimeout.cancel();
         }
+        negativeAcksTracker.close();
         stats.getStatTimeout().ifPresent(Timeout::cancel);
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 16cfa0c..a062009 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 
+import java.io.Closeable;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
@@ -30,7 +31,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap;
 
-class NegativeAcksTracker {
+class NegativeAcksTracker implements Closeable {
 
     private HashMap<MessageId, Long> nackedMessages = null;
 
@@ -93,4 +94,17 @@ class NegativeAcksTracker {
             this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
         }
     }
+
+    @Override
+    public synchronized void close() {
+        if (timeout != null && !timeout.isCancelled()) {
+            timeout.cancel();
+            timeout = null;
+        }
+
+        if (nackedMessages != null) {
+            nackedMessages.clear();
+            nackedMessages = null;
+        }
+    }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 37c9e0c..8a9e665 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -187,4 +187,18 @@ public class ConsumerImplTest {
         // then
         Assert.assertFalse(consumer.hasPendingBatchReceive());
     }
+
+    @Test
+    public void testClose() {
+        Exception checkException = null;
+        try {
+            if (consumer != null) {
+                consumer.negativeAcknowledge(new MessageIdImpl(-1, -1, -1));
+                consumer.close();
+            }
+        } catch (Exception e) {
+            checkException = e;
+        }
+        Assert.assertNull(checkException);
+    }
 }