You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/11/02 18:13:02 UTC

[pulsar] branch master updated: Fix issue producer not closed in PersistentTopicsTest (#12584)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 64d1c5a  Fix issue producer not closed in PersistentTopicsTest (#12584)
64d1c5a is described below

commit 64d1c5aacae1781053383ae20a13d09d4c5646c6
Author: Jason918 <ja...@qq.com>
AuthorDate: Wed Nov 3 02:11:57 2021 +0800

    Fix issue producer not closed in PersistentTopicsTest (#12584)
    
    Co-authored-by: Jiang Haiting <ji...@didichuxing.com>
---
 .../org/apache/pulsar/broker/admin/PersistentTopicsTest.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 4a27dbb..9c3221d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -47,6 +47,7 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
@@ -699,6 +700,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final String topicName = "persistent://prop-xyz/ns1/testGetBacklogSize";
 
         admin.topics().createPartitionedTopic(topicName, 1);
+        @Cleanup
         Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName)
                 .enableBatching(false)
                 .create();
@@ -718,6 +720,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final String topicName = "persistent://prop-xyz/ns1/testGetLastMessageId";
 
         admin.topics().createNonPartitionedTopic(topicName);
+        @Cleanup
         Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName)
                 .enableBatching(true)
                 .batchingMaxMessages(100)
@@ -731,6 +734,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         completableFuture.get();
         Assert.assertEquals(((BatchMessageIdImpl) admin.topics().getLastMessageId(topicName)).getBatchIndex(), 9);
 
+        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 .enableBatching(false)
                 .create();
@@ -748,6 +752,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final String topicName = "persistent://tenant-xyz/ns-abc/topic-123";
 
         admin.topics().createPartitionedTopic(topicName, 2);
+        @Cleanup
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName + "-partition-0").create();
 
         // Check examine message not allowed on partitioned topic.
@@ -786,6 +791,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final String topicName = "persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata";
 
         admin.topics().createPartitionedTopic(topicName, 2);
+        @Cleanup
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .producerName("testExamineMessageMetadataProducer")
                 .compressionType(CompressionType.LZ4)
@@ -923,11 +929,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2";
         admin.topics().createNonPartitionedTopic(topicName1);
         admin.topics().createNonPartitionedTopic(topicName2);
+        @Cleanup
         ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName1)
                 .enableBatching(false).create();
         String data1 = "test1";
         MessageIdImpl id1 = (MessageIdImpl) producer1.send(data1.getBytes());
-
+        @Cleanup
         ProducerBase<byte[]> producer2 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName2)
                 .enableBatching(false).create();
         String data2 = "test2";
@@ -965,6 +972,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         admin.topics().createNonPartitionedTopic(topicName);
 
         AtomicLong publishTime = new AtomicLong(0);
+        @Cleanup
         ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
                 .enableBatching(false)
                 .intercept(new ProducerInterceptor() {
@@ -1018,6 +1026,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();
 
+        @Cleanup
         ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
                 .enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MINUTES)