You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/18 23:43:54 UTC

[pulsar] branch master updated: [fix][flay-test]BrokerInterceptorTest.testProducerCreation (#17159)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 84968e84c10 [fix][flay-test]BrokerInterceptorTest.testProducerCreation (#17159)
84968e84c10 is described below

commit 84968e84c109ccc690fdd5779d2f09a50cd9da24
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Aug 19 07:43:47 2022 +0800

    [fix][flay-test]BrokerInterceptorTest.testProducerCreation (#17159)
---
 .../broker/intercept/BrokerInterceptorTest.java       | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index a26783cda65..b2c3b8d711f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -114,7 +115,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         BrokerInterceptor listener = pulsar.getBrokerInterceptor();
         Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
         admin.namespaces().createNamespace("public/test", 4);
-        Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getCount() >= 1);
     }
 
     @Test
@@ -123,7 +124,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
         pulsarClient.newProducer(Schema.BOOL).topic("test").create();
         // CONNECT and PRODUCER
-        Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 2);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getCount() >= 2);
     }
 
     @Test
@@ -133,7 +134,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         pulsarClient.newProducer(Schema.BOOL).topic("test").create();
         pulsarClient.newConsumer(Schema.STRING).topic("test1").subscriptionName("test-sub").subscribe();
         // single connection for both producer and consumer
-        assertEquals(((CounterBrokerInterceptor) listener).getConnectionCreationCount(), 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConnectionCreationCount() == 1);
     }
 
     @Test
@@ -142,7 +143,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
         assertEquals(((CounterBrokerInterceptor) listener).getProducerCount(), 0);
         pulsarClient.newProducer(Schema.BOOL).topic("test").create();
-        assertEquals(((CounterBrokerInterceptor) listener).getProducerCount(), 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getProducerCount() == 1);
     }
 
     @Test
@@ -151,7 +152,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
         assertEquals(((CounterBrokerInterceptor) listener).getConsumerCount(), 0);
         pulsarClient.newConsumer(Schema.STRING).topic("test1").subscriptionName("test-sub").subscribe();
-        assertEquals(((CounterBrokerInterceptor) listener).getConsumerCount(), 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 1);
     }
 
     @Test
@@ -178,8 +179,8 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
 
         assertEquals(msg.getValue(), "hello world");
 
-        assertEquals(((CounterBrokerInterceptor) listener).getBeforeSendCount(), 1);
-        assertEquals(((CounterBrokerInterceptor)listener).getMessageDispatchCount(),1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getBeforeSendCount() == 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getMessageDispatchCount() == 1);
     }
 
     @Test
@@ -195,8 +196,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
             Message<String> message = consumer.receive();
             consumer.acknowledge(message);
         }
-
-        Assert.assertEquals(((CounterBrokerInterceptor) interceptor).getHandleAckCount(), 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) interceptor).getHandleAckCount() == 1);
     }
 
     @Test
@@ -225,6 +225,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
             }
         });
         future.get();
+        Awaitility.await().until(() -> !interceptor.getResponseList().isEmpty());
         CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0);
         Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000");
         Assert.assertEquals(responseEvent.getResponseStatus(),