You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/03 15:16:08 UTC
[pulsar] branch master updated: Ensure the timer tasks in the
producer and consumer are cleanup after closed (#7124)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7dd0c9b Ensure the timer tasks in the producer and consumer are cleanup after closed (#7124)
7dd0c9b is described below
commit 7dd0c9b4f4cb9118fd275943d821b03fb6927004
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jun 3 23:15:56 2020 +0800
Ensure the timer tasks in the producer and consumer are cleanup after closed (#7124)
### Motivation
Ensure the timer tasks in the producer and consumer are cleanup after closed
### Verifying this change
Unit test added
---
.../SystemTopicBasedTopicPoliciesServiceTest.java | 2 +-
.../ConsumerCleanupTest.java} | 48 +++++++--------------
.../ProducerCleanupTest.java} | 49 +++++++---------------
.../pulsar/client/impl/TopicDoesNotExistsTest.java | 21 +++++++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 5 +++
5 files changed, 50 insertions(+), 75 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 69b6a5e..3e51e35 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -74,11 +74,11 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
.maxConsumerPerTopic(10)
.build();
systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
- Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));
// Wait for all topic policies updated.
Thread.sleep(3000);
+ Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));
// Assert broker is cache all topic policies
Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue(), 10);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
similarity index 52%
copy from pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
copy to pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
index 767b4c7..a0bdff5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
@@ -17,62 +17,42 @@
* under the License.
*/
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.client.api;
import io.netty.util.HashedWheelTimer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-/**
- * Tests for not exists topic.
- */
-public class TopicDoesNotExistsTest extends ProducerConsumerBase {
+public class ConsumerCleanupTest extends ProducerConsumerBase {
- @Override
@BeforeClass
- public void setup() throws Exception {
- conf.setAllowAutoTopicCreation(false);
+ @Override
+ protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}
- @Override
@AfterClass
- public void cleanup() throws Exception {
+ @Override
+ protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
- public void testCreateProducerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
- try {
- pulsarClient.newProducer()
- .topic("persistent://public/default/" + UUID.randomUUID().toString())
- .sendTimeout(100, TimeUnit.MILLISECONDS)
- .create();
- Assert.fail("Create producer should failed while topic does not exists.");
- } catch (PulsarClientException ignore) {
- }
- Thread.sleep(2000);
- HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
- Assert.assertEquals(timer.pendingTimeouts(), 0);
- Assert.assertEquals(((PulsarClientImpl) pulsarClient).producersCount(), 0);
- pulsarClient.close();
- }
-
- @Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
- public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException {
- pulsarClient.newConsumer()
+ public void testAllTimerTaskShouldCanceledAfterConsumerClosed() throws PulsarClientException, InterruptedException {
+ PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/" + UUID.randomUUID().toString())
.subscriptionName("test")
.subscribe();
+ consumer.close();
+ Thread.sleep(2000);
+ HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
+ Assert.assertEquals(timer.pendingTimeouts(), 0);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
similarity index 50%
copy from pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
copy to pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
index 767b4c7..afc73a2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
@@ -16,63 +16,44 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.client.api;
import io.netty.util.HashedWheelTimer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-/**
- * Tests for not exists topic.
- */
-public class TopicDoesNotExistsTest extends ProducerConsumerBase {
+public class ProducerCleanupTest extends ProducerConsumerBase {
+ @BeforeMethod
@Override
- @BeforeClass
- public void setup() throws Exception {
- conf.setAllowAutoTopicCreation(false);
+ protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}
+ @AfterMethod
@Override
- @AfterClass
- public void cleanup() throws Exception {
+ protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
- public void testCreateProducerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
- try {
- pulsarClient.newProducer()
- .topic("persistent://public/default/" + UUID.randomUUID().toString())
- .sendTimeout(100, TimeUnit.MILLISECONDS)
- .create();
- Assert.fail("Create producer should failed while topic does not exists.");
- } catch (PulsarClientException ignore) {
- }
+ public void testAllTimerTaskShouldCanceledAfterProducerClosed() throws PulsarClientException, InterruptedException {
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic("persistent://public/default/" + UUID.randomUUID().toString())
+ .sendTimeout(1, TimeUnit.SECONDS)
+ .create();
+ producer.close();
Thread.sleep(2000);
HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
Assert.assertEquals(timer.pendingTimeouts(), 0);
- Assert.assertEquals(((PulsarClientImpl) pulsarClient).producersCount(), 0);
- pulsarClient.close();
- }
-
- @Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
- public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException {
- pulsarClient.newConsumer()
- .topic("persistent://public/default/" + UUID.randomUUID().toString())
- .subscriptionName("test")
- .subscribe();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
index 767b4c7..4b35357 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
@@ -68,11 +68,20 @@ public class TopicDoesNotExistsTest extends ProducerConsumerBase {
pulsarClient.close();
}
- @Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
- public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException {
- pulsarClient.newConsumer()
- .topic("persistent://public/default/" + UUID.randomUUID().toString())
- .subscriptionName("test")
- .subscribe();
+ @Test
+ public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
+ PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
+ try {
+ pulsarClient.newConsumer()
+ .topic("persistent://public/default/" + UUID.randomUUID().toString())
+ .subscriptionName("test")
+ .subscribe();
+ Assert.fail("Create consumer should failed while topic does not exists.");
+ } catch (PulsarClientException ignore) {
+ }
+ Thread.sleep(2000);
+ HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
+ Assert.assertEquals(timer.pendingTimeouts(), 0);
+ Assert.assertEquals(((PulsarClientImpl) pulsarClient).consumersCount(), 0);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 11d9c19..e36ce0c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -832,6 +832,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// auto-topic-creation set to false
// No more retries are needed in this case.
setState(State.Failed);
+ closeConsumerTasks();
client.cleanupConsumer(this);
log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", topic, subscription, cnx.channel().remoteAddress());
} else {
@@ -977,6 +978,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
acknowledgmentsGroupingTracker.close();
+ if (batchReceiveTimeout != null) {
+ batchReceiveTimeout.cancel();
+ }
+ stats.getStatTimeout().ifPresent(Timeout::cancel);
}
private void failPendingReceive() {