You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/10/11 03:12:27 UTC

[pulsar] branch branch-3.0 updated: [fix] [broker] fix flaky test PatternTopicsConsumerImplTest (#21222)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ff4845b1973 [fix] [broker] fix flaky test PatternTopicsConsumerImplTest (#21222)
ff4845b1973 is described below

commit ff4845b1973cd65c911d2659f91d65fa76b0cccb
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sat Sep 23 22:15:23 2023 +0800

    [fix] [broker] fix flaky test PatternTopicsConsumerImplTest (#21222)
    
    (cherry picked from commit be4bcac11ae76cdf3d4c4b0639735f3309919e4c)
---
 .../client/impl/PatternTopicsConsumerImplTest.java | 55 ++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index d1c569565bc..f01a1a2d07d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -67,6 +68,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
         isTcpLookup = true;
         // enabled transaction, to test pattern consumers not subscribe to transaction system topic.
         conf.setTransactionCoordinatorEnabled(true);
+        conf.setSubscriptionPatternMaxLength(10000);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -210,6 +212,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             .subscribe();
         assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+        });
+
         // 4. verify consumer get methods, to get right number of partitions and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
@@ -287,6 +295,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
                 .subscribe();
         assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+        });
+
         // 4. verify consumer get methods, to get right number of partitions and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
@@ -364,6 +378,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             .subscriptionTopicsMode(RegexSubscriptionMode.NonPersistentOnly)
             .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+        });
+
         // 4. verify consumer get methods, to get right number of partitions and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
@@ -455,6 +475,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+        });
+
         // 4. verify consumer get methods, to get right number of partitions and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
@@ -525,6 +551,11 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+        });
 
         // 3. verify consumer get methods, to get 5 number of partitions and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
@@ -605,6 +636,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
                 .receiverQueueSize(4)
                 .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+        });
+
         // 1. create partition
         String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key;
         TenantInfoImpl tenantInfo = createDefaultTenantInfo();
@@ -665,6 +702,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             .receiverQueueSize(4)
             .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+        });
+
         assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
 
         // 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3
@@ -775,6 +818,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             .receiverQueueSize(4)
             .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+        });
+
         assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
 
         // 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3
@@ -861,6 +910,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             .subscriptionName("sub")
             .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
+        });
+
         assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
         PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;