You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/25 05:49:04 UTC
[pulsar] 14/14: [fix][test]: fix flaky test testTransactionBufferRecoverThrowPulsarClientException (#14846)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 06a467825aa1c1ec10518a3300088a1dfc96c594
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Mar 24 22:44:40 2022 +0800
[fix][test]: fix flaky test testTransactionBufferRecoverThrowPulsarClientException (#14846)
Co-authored-by: congbo <co...@github.com>
(cherry picked from commit dca5a901528e77f218afb9870b223f06143b055f)
---
.../TopicTransactionBufferRecoverTest.java | 22 ++++++++++++++++------
1 file changed, 16 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 01e03a4..392a21f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -454,7 +454,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
@Test(timeOut=30000)
public void testTransactionBufferRecoverThrowPulsarClientException() throws Exception {
- String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowBrokerMetadataException";
+ String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowPulsarClientException";
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
@@ -470,6 +470,8 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
producer.newMessage(txn).value("test".getBytes()).sendAsync();
txn.commit().get();
+ producer.close();
+
PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
.getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
TransactionBufferSnapshotService transactionBufferSnapshotService =
@@ -479,6 +481,8 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
doReturn(CompletableFuture.completedFuture(writer)).when(transactionBufferSnapshotService).createWriter(any());
+ doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
+ doReturn(CompletableFuture.completedFuture(null)).when(writer).closeAsync();
Field field = PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
field.setAccessible(true);
TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
@@ -487,7 +491,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
doThrow(new PulsarClientException("test")).when(reader).hasMoreEvents();
// check reader close topic
checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
- transactionBufferSnapshotService, originalTopic, field, producer);
+ transactionBufferSnapshotService, originalTopic, field);
doReturn(true).when(reader).hasMoreEvents();
// mock create reader fail
@@ -497,7 +501,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
.getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
- transactionBufferSnapshotService, originalTopic, field, producer);
+ transactionBufferSnapshotService, originalTopic, field);
doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
// check create writer fail close topic
@@ -507,7 +511,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
.when(transactionBufferSnapshotService).createWriter(any());
checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
- transactionBufferSnapshotService, originalTopic, field, producer);
+ transactionBufferSnapshotService, originalTopic, field);
}
@@ -515,7 +519,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal,
TransactionBufferSnapshotService transactionBufferSnapshotService,
PersistentTopic originalTopic,
- Field field, Producer<byte[]> producer) throws Exception {
+ Field field) throws Exception {
field.set(getPulsarServiceList().get(0), transactionBufferSnapshotService);
// recover again will throw then close topic
@@ -529,13 +533,19 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceOriginal);
- // topic recover success
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient
+ .newProducer()
+ .topic(originalTopic.getName())
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
producer.newMessage(txn).value("test".getBytes()).sendAsync();
txn.commit().get();
+ producer.close();
}
}