You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/04 13:24:50 UTC
[pulsar] 02/02: [fix][flay-test]BrokerInterceptorTest.testProducerCreation (#17159)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7be144f51b51b08474b79ec3a3ec4fcc686bc908
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Aug 19 07:43:47 2022 +0800
[fix][flay-test]BrokerInterceptorTest.testProducerCreation (#17159)
(cherry picked from commit 84968e84c109ccc690fdd5779d2f09a50cd9da24)
---
.../pulsar/broker/intercept/BrokerInterceptorTest.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index d0c163f4356..0e46b147e7b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -111,7 +112,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
BrokerInterceptor listener = pulsar.getBrokerInterceptor();
Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
admin.namespaces().createNamespace("public/test", 4);
- Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 1);
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getCount() >= 1);
}
@Test
@@ -120,7 +121,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
pulsarClient.newProducer(Schema.BOOL).topic("test").create();
// CONNECT and PRODUCER
- Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 2);
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getCount() >= 2);
}
@Test
@@ -130,7 +131,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
pulsarClient.newProducer(Schema.BOOL).topic("test").create();
pulsarClient.newConsumer(Schema.STRING).topic("test1").subscriptionName("test-sub").subscribe();
// single connection for both producer and consumer
- Assert.assertTrue(((CounterBrokerInterceptor)listener).getConnectionCreationCount() == 1);
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConnectionCreationCount() == 1);
}
@Test
@@ -139,7 +140,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
Assert.assertTrue(((CounterBrokerInterceptor)listener).getProducerCount() == 0);
pulsarClient.newProducer(Schema.BOOL).topic("test").create();
- Assert.assertTrue(((CounterBrokerInterceptor)listener).getProducerCount() == 1);
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getProducerCount() == 1);
}
@Test
@@ -148,7 +149,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
Assert.assertTrue(((CounterBrokerInterceptor)listener).getConsumerCount() == 0);
pulsarClient.newConsumer(Schema.STRING).topic("test1").subscriptionName("test-sub").subscribe();
- Assert.assertTrue(((CounterBrokerInterceptor)listener).getConsumerCount() == 1);
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 1);
}
@Test
@@ -175,8 +176,8 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
assertEquals(msg.getValue(), "hello world");
- assertEquals(((CounterBrokerInterceptor) listener).getBeforeSendCount(), 1);
- assertEquals(((CounterBrokerInterceptor)listener).getMessageDispatchCount(),1);
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getBeforeSendCount() == 1);
+ Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getMessageDispatchCount() == 1);
}
@Test
@@ -205,6 +206,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
}
});
future.get();
+ Awaitility.await().until(() -> !interceptor.getResponseList().isEmpty());
CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0);
Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000");
Assert.assertEquals(responseEvent.getResponseStatus(),