You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/10/11 03:12:27 UTC
[pulsar] branch branch-3.0 updated: [fix] [broker] fix flaky test PatternTopicsConsumerImplTest (#21222)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ff4845b1973 [fix] [broker] fix flaky test PatternTopicsConsumerImplTest (#21222)
ff4845b1973 is described below
commit ff4845b1973cd65c911d2659f91d65fa76b0cccb
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sat Sep 23 22:15:23 2023 +0800
[fix] [broker] fix flaky test PatternTopicsConsumerImplTest (#21222)
(cherry picked from commit be4bcac11ae76cdf3d4c4b0639735f3309919e4c)
---
.../client/impl/PatternTopicsConsumerImplTest.java | 55 ++++++++++++++++++++++
1 file changed, 55 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index d1c569565bc..f01a1a2d07d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -67,6 +68,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
isTcpLookup = true;
// enabled transaction, to test pattern consumers not subscribe to transaction system topic.
conf.setTransactionCoordinatorEnabled(true);
+ conf.setSubscriptionPatternMaxLength(10000);
super.internalSetup();
super.producerBaseSetup();
}
@@ -210,6 +212,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.subscribe();
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
+ // Wait topic list watcher creation.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+ assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+ });
+
// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
@@ -287,6 +295,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.subscribe();
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
+ // Wait topic list watcher creation.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+ assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+ });
+
// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
@@ -364,6 +378,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.subscriptionTopicsMode(RegexSubscriptionMode.NonPersistentOnly)
.subscribe();
+ // Wait topic list watcher creation.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+ assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+ });
+
// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
@@ -455,6 +475,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.subscribe();
+ // Wait topic list watcher creation.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+ assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+ });
+
// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
@@ -525,6 +551,11 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
+ // Wait topic list watcher creation.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+ assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+ });
// 3. verify consumer get methods, to get 5 number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
@@ -605,6 +636,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.receiverQueueSize(4)
.subscribe();
+ // Wait topic list watcher creation.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+ assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+ });
+
// 1. create partition
String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key;
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
@@ -665,6 +702,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.receiverQueueSize(4)
.subscribe();
+ // Wait topic list watcher creation.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+ assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+ });
+
assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
// 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3
@@ -775,6 +818,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.receiverQueueSize(4)
.subscribe();
+ // Wait topic list watcher creation.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+ assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+ });
+
assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
// 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3
@@ -861,6 +910,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.subscriptionName("sub")
.subscribe();
+ // Wait topic list watcher creation.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+ assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+ });
+
assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;