You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/03 15:16:08 UTC

[pulsar] branch master updated: Ensure the timer tasks in the producer and consumer are cleanup after closed (#7124)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dd0c9b  Ensure the timer tasks in the producer and consumer are cleanup after closed (#7124)
7dd0c9b is described below

commit 7dd0c9b4f4cb9118fd275943d821b03fb6927004
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jun 3 23:15:56 2020 +0800

    Ensure the timer tasks in the producer and consumer are cleanup after closed (#7124)
    
    ### Motivation
    
    Ensure the timer tasks in the producer and consumer are cleanup after closed
    
    ### Verifying this change
    
    Unit test added
---
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  2 +-
 .../ConsumerCleanupTest.java}                      | 48 +++++++--------------
 .../ProducerCleanupTest.java}                      | 49 +++++++---------------
 .../pulsar/client/impl/TopicDoesNotExistsTest.java | 21 +++++++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  5 +++
 5 files changed, 50 insertions(+), 75 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 69b6a5e..3e51e35 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -74,11 +74,11 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
                 .maxConsumerPerTopic(10)
                 .build();
         systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
-        Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));
 
         // Wait for all topic policies updated.
         Thread.sleep(3000);
 
+        Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));
         // Assert broker is cache all topic policies
         Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue(), 10);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
similarity index 52%
copy from pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
copy to pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
index 767b4c7..a0bdff5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
@@ -17,62 +17,42 @@
  * under the License.
  */
 
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.client.api;
 
 import io.netty.util.HashedWheelTimer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
-/**
- * Tests for not exists topic.
- */
-public class TopicDoesNotExistsTest extends ProducerConsumerBase {
+public class ConsumerCleanupTest extends ProducerConsumerBase {
 
-    @Override
     @BeforeClass
-    public void setup() throws Exception {
-        conf.setAllowAutoTopicCreation(false);
+    @Override
+    protected void setup() throws Exception {
         super.internalSetup();
         super.producerBaseSetup();
     }
 
-    @Override
     @AfterClass
-    public void cleanup() throws Exception {
+    @Override
+    protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
     @Test
-    public void testCreateProducerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
-        try {
-            pulsarClient.newProducer()
-                    .topic("persistent://public/default/" + UUID.randomUUID().toString())
-                    .sendTimeout(100, TimeUnit.MILLISECONDS)
-                    .create();
-            Assert.fail("Create producer should failed while topic does not exists.");
-        } catch (PulsarClientException ignore) {
-        }
-        Thread.sleep(2000);
-        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
-        Assert.assertEquals(timer.pendingTimeouts(), 0);
-        Assert.assertEquals(((PulsarClientImpl) pulsarClient).producersCount(), 0);
-        pulsarClient.close();
-    }
-
-    @Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
-    public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException {
-        pulsarClient.newConsumer()
+    public void testAllTimerTaskShouldCanceledAfterConsumerClosed() throws PulsarClientException, InterruptedException {
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("persistent://public/default/" + UUID.randomUUID().toString())
                 .subscriptionName("test")
                 .subscribe();
+        consumer.close();
+        Thread.sleep(2000);
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
+        Assert.assertEquals(timer.pendingTimeouts(), 0);
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
similarity index 50%
copy from pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
copy to pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
index 767b4c7..afc73a2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
@@ -16,63 +16,44 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.client.api;
 
 import io.netty.util.HashedWheelTimer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-/**
- * Tests for not exists topic.
- */
-public class TopicDoesNotExistsTest extends ProducerConsumerBase {
+public class ProducerCleanupTest extends ProducerConsumerBase {
 
+    @BeforeMethod
     @Override
-    @BeforeClass
-    public void setup() throws Exception {
-        conf.setAllowAutoTopicCreation(false);
+    protected void setup() throws Exception {
         super.internalSetup();
         super.producerBaseSetup();
     }
 
+    @AfterMethod
     @Override
-    @AfterClass
-    public void cleanup() throws Exception {
+    protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
     @Test
-    public void testCreateProducerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
-        try {
-            pulsarClient.newProducer()
-                    .topic("persistent://public/default/" + UUID.randomUUID().toString())
-                    .sendTimeout(100, TimeUnit.MILLISECONDS)
-                    .create();
-            Assert.fail("Create producer should failed while topic does not exists.");
-        } catch (PulsarClientException ignore) {
-        }
+    public void testAllTimerTaskShouldCanceledAfterProducerClosed() throws PulsarClientException, InterruptedException {
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic("persistent://public/default/" + UUID.randomUUID().toString())
+                .sendTimeout(1, TimeUnit.SECONDS)
+                .create();
+        producer.close();
         Thread.sleep(2000);
         HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
         Assert.assertEquals(timer.pendingTimeouts(), 0);
-        Assert.assertEquals(((PulsarClientImpl) pulsarClient).producersCount(), 0);
-        pulsarClient.close();
-    }
-
-    @Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
-    public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException {
-        pulsarClient.newConsumer()
-                .topic("persistent://public/default/" + UUID.randomUUID().toString())
-                .subscriptionName("test")
-                .subscribe();
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
index 767b4c7..4b35357 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
@@ -68,11 +68,20 @@ public class TopicDoesNotExistsTest extends ProducerConsumerBase {
         pulsarClient.close();
     }
 
-    @Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
-    public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException {
-        pulsarClient.newConsumer()
-                .topic("persistent://public/default/" + UUID.randomUUID().toString())
-                .subscriptionName("test")
-                .subscribe();
+    @Test
+    public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
+        try {
+            pulsarClient.newConsumer()
+                    .topic("persistent://public/default/" + UUID.randomUUID().toString())
+                    .subscriptionName("test")
+                    .subscribe();
+            Assert.fail("Create consumer should failed while topic does not exists.");
+        } catch (PulsarClientException ignore) {
+        }
+        Thread.sleep(2000);
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
+        Assert.assertEquals(timer.pendingTimeouts(), 0);
+        Assert.assertEquals(((PulsarClientImpl) pulsarClient).consumersCount(), 0);
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 11d9c19..e36ce0c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -832,6 +832,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 //    auto-topic-creation set to false
                 // No more retries are needed in this case.
                 setState(State.Failed);
+                closeConsumerTasks();
                 client.cleanupConsumer(this);
                 log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", topic, subscription, cnx.channel().remoteAddress());
             } else {
@@ -977,6 +978,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         acknowledgmentsGroupingTracker.close();
+        if (batchReceiveTimeout != null) {
+            batchReceiveTimeout.cancel();
+        }
+        stats.getStatTimeout().ifPresent(Timeout::cancel);
     }
 
     private void failPendingReceive() {