You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/25 05:49:04 UTC

[pulsar] 14/14: [fix][test]: fix flaky test testTransactionBufferRecoverThrowPulsarClientException (#14846)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 06a467825aa1c1ec10518a3300088a1dfc96c594
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Mar 24 22:44:40 2022 +0800

    [fix][test]: fix flaky test testTransactionBufferRecoverThrowPulsarClientException (#14846)
    
    Co-authored-by: congbo <co...@github.com>
    (cherry picked from commit dca5a901528e77f218afb9870b223f06143b055f)
---
 .../TopicTransactionBufferRecoverTest.java         | 22 ++++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 01e03a4..392a21f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -454,7 +454,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
     @Test(timeOut=30000)
     public void testTransactionBufferRecoverThrowPulsarClientException() throws Exception {
-        String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowBrokerMetadataException";
+        String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowPulsarClientException";
         @Cleanup
         Producer<byte[]> producer = pulsarClient
                 .newProducer()
@@ -470,6 +470,8 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         producer.newMessage(txn).value("test".getBytes()).sendAsync();
         txn.commit().get();
 
+        producer.close();
+
         PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
         TransactionBufferSnapshotService transactionBufferSnapshotService =
@@ -479,6 +481,8 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
         doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
         doReturn(CompletableFuture.completedFuture(writer)).when(transactionBufferSnapshotService).createWriter(any());
+        doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
+        doReturn(CompletableFuture.completedFuture(null)).when(writer).closeAsync();
         Field field = PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
         field.setAccessible(true);
         TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
@@ -487,7 +491,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         doThrow(new PulsarClientException("test")).when(reader).hasMoreEvents();
         // check reader close topic
         checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
+                transactionBufferSnapshotService, originalTopic, field);
         doReturn(true).when(reader).hasMoreEvents();
 
         // mock create reader fail
@@ -497,7 +501,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
         checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
+                transactionBufferSnapshotService, originalTopic, field);
         doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
 
         // check create writer fail close topic
@@ -507,7 +511,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
                 .when(transactionBufferSnapshotService).createWriter(any());
         checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
+                transactionBufferSnapshotService, originalTopic, field);
 
     }
 
@@ -515,7 +519,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
                                  TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal,
                                  TransactionBufferSnapshotService transactionBufferSnapshotService,
                                  PersistentTopic originalTopic,
-                                 Field field, Producer<byte[]> producer) throws Exception {
+                                 Field field) throws Exception {
         field.set(getPulsarServiceList().get(0), transactionBufferSnapshotService);
 
         // recover again will throw then close topic
@@ -529,13 +533,19 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
         field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceOriginal);
 
-        // topic recover success
         Transaction txn = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.SECONDS)
                 .build().get();
 
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(originalTopic.getName())
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
         producer.newMessage(txn).value("test".getBytes()).sendAsync();
         txn.commit().get();
+        producer.close();
     }
 
 }