You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2023/08/11 05:41:24 UTC

[pulsar] branch master updated: [fix][test] Fix flaky test `testCloseTransactionBufferWhenTimeout` and close producer/consumer (#20956)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 98628845aa9 [fix][test] Fix flaky test `testCloseTransactionBufferWhenTimeout` and close producer/consumer (#20956)
98628845aa9 is described below

commit 98628845aa94b416b0c3711b5a15819ade549c57
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Aug 11 13:41:17 2023 +0800

    [fix][test] Fix flaky test `testCloseTransactionBufferWhenTimeout` and close producer/consumer (#20956)
---
 .../pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java | 3 +++
 .../broker/transaction/TopicTransactionBufferRecoverTest.java     | 4 ++++
 .../apache/pulsar/broker/transaction/TransactionProduceTest.java  | 4 ++++
 .../org/apache/pulsar/broker/transaction/TransactionTest.java     | 7 ++++++-
 .../broker/transaction/buffer/TopicTransactionBufferTest.java     | 6 +++---
 .../broker/transaction/buffer/TransactionBufferClientTest.java    | 8 +++++---
 .../broker/transaction/buffer/TransactionLowWaterMarkTest.java    | 2 ++
 .../broker/transaction/buffer/TransactionStablePositionTest.java  | 4 ++++
 .../org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java  | 2 +-
 9 files changed, 32 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index 6e763cb44fb..7f557abe0bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -408,7 +408,9 @@ public class SegmentAbortedTxnProcessorTest extends TransactionTestBase {
         // Create a topic, send 10 messages without using transactions, and send 10 messages using transactions.
         // Abort these transactions and verify the data.
         final String topicName = "persistent://" + NAMESPACE2 + "/testSnapshotProcessorUpgrade";
+        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
 
         assertTrue(getSnapshotAbortedTxnProcessor(topicName) instanceof SingleSnapshotAbortedTxnProcessorImpl);
@@ -492,6 +494,7 @@ public class SegmentAbortedTxnProcessorTest extends TransactionTestBase {
 
         // Create a new topic in the namespace
         String topicName = "persistent://" + namespaceName + "/newTopic";
+        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         producer.close();
         assertTrue(getSnapshotAbortedTxnProcessor(topicName) instanceof SnapshotSegmentAbortedTxnProcessorImpl);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 2d6622571c0..bde9307552f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -510,6 +510,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         getPulsarServiceList().get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(enableSnapshotSegment);
         String topic = NAMESPACE1 + "/tb-snapshot-delete-" + RandomUtils.nextInt();
 
+        @Cleanup
         Producer<byte[]> producer = pulsarClient
                 .newProducer()
                 .topic(topic)
@@ -559,6 +560,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) field.get(persistentTopic);
         Awaitility.await().untilAsserted(() -> assertTrue(compactionFuture.isDone()));
 
+        @Cleanup
         Reader<GenericRecord> reader = pulsarClient.newReader(Schema.AUTO_CONSUME())
                 .readCompacted(true)
                 .startMessageId(MessageId.earliest)
@@ -853,11 +855,13 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         this.getPulsarServiceList().get(0).getConfig()
                 .setTransactionBufferSnapshotMaxTransactionCount(theCountOfSnapshotMaxTxnCount);
         // 1. Build producer and consumer
+        @Cleanup
         Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
                 .topic(topic)
                 .enableBatching(false)
                 .create();
 
+        @Cleanup
         Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName(subName)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index ddd8cf07903..32ffd293893 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -230,6 +230,7 @@ public class TransactionProduceTest extends TransactionTestBase {
                 .build().get();
         log.info("init transaction {}.", txn);
 
+        @Cleanup
         Producer<byte[]> incomingProducer = pulsarClient.newProducer()
                 .topic(ACK_COMMIT_TOPIC)
                 .batchingMaxMessages(1)
@@ -241,6 +242,7 @@ public class TransactionProduceTest extends TransactionTestBase {
         }
         log.info("prepare incoming messages finished.");
 
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(ACK_COMMIT_TOPIC)
                 .subscriptionName(subscriptionName)
@@ -292,6 +294,7 @@ public class TransactionProduceTest extends TransactionTestBase {
                 .build().get();
         log.info("init transaction {}.", txn);
 
+        @Cleanup
         Producer<byte[]> incomingProducer = pulsarClient.newProducer()
                 .topic(ACK_ABORT_TOPIC)
                 .batchingMaxMessages(1)
@@ -303,6 +306,7 @@ public class TransactionProduceTest extends TransactionTestBase {
         }
         log.info("prepare incoming messages finished.");
 
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(ACK_ABORT_TOPIC)
                 .subscriptionName(subscriptionName)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index cf389824794..905da9379ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -361,12 +361,14 @@ public class TransactionTest extends TransactionTestBase {
         ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
 
         //build producer/consumer
+        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
                 .producerName("producer")
                 .sendTimeout(0, TimeUnit.SECONDS)
                 .create();
 
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topic)
                 .subscriptionType(SubscriptionType.Exclusive)
@@ -471,7 +473,8 @@ public class TransactionTest extends TransactionTestBase {
         }
         admin.topics().setRetention(topic,
                 new RetentionPolicies(retentionSizeInMinutesSetTopic, retentionSizeInMbSetTopic));
-        pulsarClient.newConsumer().topic(topic)
+        @Cleanup
+        final Consumer<byte[]> subscribe = pulsarClient.newConsumer().topic(topic)
                 .subscriptionName(subName)
                 .subscribe();
         pulsarService.getBrokerService().getTopicIfExists(topic).thenAccept(option -> {
@@ -533,6 +536,7 @@ public class TransactionTest extends TransactionTestBase {
                 .untilAsserted(() -> Assert.assertFalse(reader.hasMessageAvailable()));
 
         //test take snapshot by build producer by the transactionEnable client
+        @Cleanup
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS)
                 .topic(topic).enableBatching(true)
@@ -652,6 +656,7 @@ public class TransactionTest extends TransactionTestBase {
         Assert.assertEquals(position.getEntryId(), messageId.getEntryId());
 
         //test the state of TransactionBuffer is Ready after build Producer by pulsarClient that enables transaction.
+        @Cleanup
         Producer<String> txnProducer = pulsarClient.newProducer(Schema.STRING)
                 .producerName("testTransactionPublish")
                 .topic(topic)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index aa98fc7d701..a50363c861c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -147,13 +147,13 @@ public class TopicTransactionBufferTest extends TransactionTestBase {
 
     @Test
     public void testCloseTransactionBufferWhenTimeout() throws Exception {
-        String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
+        String topic = "persistent://" + NAMESPACE1 + "/testCloseTransactionBufferWhenTimeout";
         PulsarService pulsar = pulsarServiceList.get(0);
         BrokerService brokerService0 = pulsar.getBrokerService();
         BrokerService brokerService = Mockito.spy(brokerService0);
         AtomicReference<PersistentTopic> reference = new AtomicReference<>();
-        pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10);
-        long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1);
+        pulsar.getConfiguration().setTopicLoadTimeoutSeconds(5);
+        long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 3);
 
         Mockito
                 .doAnswer(inv -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 2cfc9f46f0e..6dde7d3cf37 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -155,11 +155,13 @@ public class TransactionBufferClientTest extends TransactionTestBase {
                 break;
             }
         }
-        pulsarClient = PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+        @Cleanup
+        PulsarClient localPulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
                 .enableTransaction(true).build();
         @Cleanup
-        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic1).create();
-        Transaction transaction = pulsarClient.newTransaction()
+        Producer<byte[]> producer = localPulsarClient.newProducer(Schema.BYTES).topic(topic1).create();
+        Transaction transaction = localPulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.HOURS)
                 .build().get();
         producer.newMessage(transaction).send();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 3901f186d81..aa7240a59f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -106,6 +106,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
                 .withTransactionTimeout(5, TimeUnit.SECONDS)
                 .build().get();
 
+        @Cleanup
         Producer<byte[]> producer = pulsarClient
                 .newProducer()
                 .topic(TOPIC)
@@ -113,6 +114,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
                 .enableBatching(false)
                 .create();
 
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(TOPIC)
                 .subscriptionName("test")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index 7493b25ac1d..b3bd2ec4660 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -78,6 +78,7 @@ public class TransactionStablePositionTest extends TransactionTestBase {
                 .withTransactionTimeout(5, TimeUnit.SECONDS)
                 .build().get();
 
+        @Cleanup
         Producer<byte[]> producer = pulsarClient
                 .newProducer()
                 .topic(TOPIC)
@@ -85,6 +86,7 @@ public class TransactionStablePositionTest extends TransactionTestBase {
                 .enableBatching(false)
                 .create();
 
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(TOPIC)
                 .subscriptionName("test")
@@ -124,6 +126,7 @@ public class TransactionStablePositionTest extends TransactionTestBase {
                 .withTransactionTimeout(5, TimeUnit.SECONDS)
                 .build().get();
 
+        @Cleanup
         Producer<byte[]> producer = pulsarClient
                 .newProducer()
                 .topic(TOPIC)
@@ -131,6 +134,7 @@ public class TransactionStablePositionTest extends TransactionTestBase {
                 .enableBatching(false)
                 .create();
 
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(TOPIC)
                 .subscriptionName("test")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
index 6f7d8d9c3f1..1d534176e8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
@@ -146,7 +146,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
         // Wait for all consumers to continue receiving messages.
         Awaitility.await()
-                .atMost(15, TimeUnit.SECONDS)
+                .atMost(30, TimeUnit.SECONDS)
                 .pollDelay(5, TimeUnit.SECONDS)
                 .until(() ->
                         (System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5));