You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/02/21 07:05:44 UTC

[pulsar] branch master updated: [fix][test] flaky test: testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek (#19572)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ec65113b49 [fix][test] flaky test: testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek (#19572)
8ec65113b49 is described below

commit 8ec65113b49af4459b4d8628b18ed8760e540a7e
Author: labuladong <la...@foxmail.com>
AuthorDate: Tue Feb 21 15:05:36 2023 +0800

    [fix][test] flaky test: testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek (#19572)
---
 .../java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 2c2f62529d2..93f2a42bcda 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -598,7 +598,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
                 .subscriptionName("my-subscription")
                 .subscribe();
 
-        pulsarClient.newConsumer()
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Failover)
                 .subscriptionName("my-subscription")
@@ -615,8 +615,8 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         }
         assertEquals(connectedSinceSet.size(), 2);
         consumer1.seek(MessageId.earliest);
-        // Wait for consumer to reconnect
-        Awaitility.await().until(consumer1::isConnected);
+        // Wait for consumers to reconnect
+        Awaitility.await().until(() -> consumer1.isConnected() && consumer2.isConnected());
 
         consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
         assertEquals(consumers.size(), 2);