You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2023/08/11 05:41:24 UTC
[pulsar] branch master updated: [fix][test] Fix flaky test `testCloseTransactionBufferWhenTimeout` and close producer/consumer (#20956)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 98628845aa9 [fix][test] Fix flaky test `testCloseTransactionBufferWhenTimeout` and close producer/consumer (#20956)
98628845aa9 is described below
commit 98628845aa94b416b0c3711b5a15819ade549c57
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Aug 11 13:41:17 2023 +0800
[fix][test] Fix flaky test `testCloseTransactionBufferWhenTimeout` and close producer/consumer (#20956)
---
.../pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java | 3 +++
.../broker/transaction/TopicTransactionBufferRecoverTest.java | 4 ++++
.../apache/pulsar/broker/transaction/TransactionProduceTest.java | 4 ++++
.../org/apache/pulsar/broker/transaction/TransactionTest.java | 7 ++++++-
.../broker/transaction/buffer/TopicTransactionBufferTest.java | 6 +++---
.../broker/transaction/buffer/TransactionBufferClientTest.java | 8 +++++---
.../broker/transaction/buffer/TransactionLowWaterMarkTest.java | 2 ++
.../broker/transaction/buffer/TransactionStablePositionTest.java | 4 ++++
.../org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java | 2 +-
9 files changed, 32 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index 6e763cb44fb..7f557abe0bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -408,7 +408,9 @@ public class SegmentAbortedTxnProcessorTest extends TransactionTestBase {
// Create a topic, send 10 messages without using transactions, and send 10 messages using transactions.
// Abort these transactions and verify the data.
final String topicName = "persistent://" + NAMESPACE2 + "/testSnapshotProcessorUpgrade";
+ @Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
assertTrue(getSnapshotAbortedTxnProcessor(topicName) instanceof SingleSnapshotAbortedTxnProcessorImpl);
@@ -492,6 +494,7 @@ public class SegmentAbortedTxnProcessorTest extends TransactionTestBase {
// Create a new topic in the namespace
String topicName = "persistent://" + namespaceName + "/newTopic";
+ @Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
assertTrue(getSnapshotAbortedTxnProcessor(topicName) instanceof SnapshotSegmentAbortedTxnProcessorImpl);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 2d6622571c0..bde9307552f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -510,6 +510,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
getPulsarServiceList().get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(enableSnapshotSegment);
String topic = NAMESPACE1 + "/tb-snapshot-delete-" + RandomUtils.nextInt();
+ @Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
@@ -559,6 +560,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) field.get(persistentTopic);
Awaitility.await().untilAsserted(() -> assertTrue(compactionFuture.isDone()));
+ @Cleanup
Reader<GenericRecord> reader = pulsarClient.newReader(Schema.AUTO_CONSUME())
.readCompacted(true)
.startMessageId(MessageId.earliest)
@@ -853,11 +855,13 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
this.getPulsarServiceList().get(0).getConfig()
.setTransactionBufferSnapshotMaxTransactionCount(theCountOfSnapshotMaxTxnCount);
// 1. Build producer and consumer
+ @Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();
+ @Cleanup
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index ddd8cf07903..32ffd293893 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -230,6 +230,7 @@ public class TransactionProduceTest extends TransactionTestBase {
.build().get();
log.info("init transaction {}.", txn);
+ @Cleanup
Producer<byte[]> incomingProducer = pulsarClient.newProducer()
.topic(ACK_COMMIT_TOPIC)
.batchingMaxMessages(1)
@@ -241,6 +242,7 @@ public class TransactionProduceTest extends TransactionTestBase {
}
log.info("prepare incoming messages finished.");
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(ACK_COMMIT_TOPIC)
.subscriptionName(subscriptionName)
@@ -292,6 +294,7 @@ public class TransactionProduceTest extends TransactionTestBase {
.build().get();
log.info("init transaction {}.", txn);
+ @Cleanup
Producer<byte[]> incomingProducer = pulsarClient.newProducer()
.topic(ACK_ABORT_TOPIC)
.batchingMaxMessages(1)
@@ -303,6 +306,7 @@ public class TransactionProduceTest extends TransactionTestBase {
}
log.info("prepare incoming messages finished.");
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(ACK_ABORT_TOPIC)
.subscriptionName(subscriptionName)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index cf389824794..905da9379ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -361,12 +361,14 @@ public class TransactionTest extends TransactionTestBase {
ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
//build producer/consumer
+ @Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer")
.sendTimeout(0, TimeUnit.SECONDS)
.create();
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
@@ -471,7 +473,8 @@ public class TransactionTest extends TransactionTestBase {
}
admin.topics().setRetention(topic,
new RetentionPolicies(retentionSizeInMinutesSetTopic, retentionSizeInMbSetTopic));
- pulsarClient.newConsumer().topic(topic)
+ @Cleanup
+ final Consumer<byte[]> subscribe = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscribe();
pulsarService.getBrokerService().getTopicIfExists(topic).thenAccept(option -> {
@@ -533,6 +536,7 @@ public class TransactionTest extends TransactionTestBase {
.untilAsserted(() -> Assert.assertFalse(reader.hasMessageAvailable()));
//test take snapshot by build producer by the transactionEnable client
+ @Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS)
.topic(topic).enableBatching(true)
@@ -652,6 +656,7 @@ public class TransactionTest extends TransactionTestBase {
Assert.assertEquals(position.getEntryId(), messageId.getEntryId());
//test the state of TransactionBuffer is Ready after build Producer by pulsarClient that enables transaction.
+ @Cleanup
Producer<String> txnProducer = pulsarClient.newProducer(Schema.STRING)
.producerName("testTransactionPublish")
.topic(topic)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index aa98fc7d701..a50363c861c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -147,13 +147,13 @@ public class TopicTransactionBufferTest extends TransactionTestBase {
@Test
public void testCloseTransactionBufferWhenTimeout() throws Exception {
- String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
+ String topic = "persistent://" + NAMESPACE1 + "/testCloseTransactionBufferWhenTimeout";
PulsarService pulsar = pulsarServiceList.get(0);
BrokerService brokerService0 = pulsar.getBrokerService();
BrokerService brokerService = Mockito.spy(brokerService0);
AtomicReference<PersistentTopic> reference = new AtomicReference<>();
- pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10);
- long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1);
+ pulsar.getConfiguration().setTopicLoadTimeoutSeconds(5);
+ long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 3);
Mockito
.doAnswer(inv -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 2cfc9f46f0e..6dde7d3cf37 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -155,11 +155,13 @@ public class TransactionBufferClientTest extends TransactionTestBase {
break;
}
}
- pulsarClient = PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ @Cleanup
+ PulsarClient localPulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
.enableTransaction(true).build();
@Cleanup
- Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic1).create();
- Transaction transaction = pulsarClient.newTransaction()
+ Producer<byte[]> producer = localPulsarClient.newProducer(Schema.BYTES).topic(topic1).create();
+ Transaction transaction = localPulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build().get();
producer.newMessage(transaction).send();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 3901f186d81..aa7240a59f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -106,6 +106,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
+ @Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(TOPIC)
@@ -113,6 +114,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
.enableBatching(false)
.create();
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(TOPIC)
.subscriptionName("test")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index 7493b25ac1d..b3bd2ec4660 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -78,6 +78,7 @@ public class TransactionStablePositionTest extends TransactionTestBase {
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
+ @Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(TOPIC)
@@ -85,6 +86,7 @@ public class TransactionStablePositionTest extends TransactionTestBase {
.enableBatching(false)
.create();
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(TOPIC)
.subscriptionName("test")
@@ -124,6 +126,7 @@ public class TransactionStablePositionTest extends TransactionTestBase {
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
+ @Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(TOPIC)
@@ -131,6 +134,7 @@ public class TransactionStablePositionTest extends TransactionTestBase {
.enableBatching(false)
.create();
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(TOPIC)
.subscriptionName("test")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
index 6f7d8d9c3f1..1d534176e8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
@@ -146,7 +146,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
// Wait for all consumers to continue receiving messages.
Awaitility.await()
- .atMost(15, TimeUnit.SECONDS)
+ .atMost(30, TimeUnit.SECONDS)
.pollDelay(5, TimeUnit.SECONDS)
.until(() ->
(System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5));