You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/24 21:24:27 UTC

[kafka] branch trunk updated: KAFKA-6462: fix unstable ResetIntegrationTest (#4446)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 52191c3  KAFKA-6462: fix unstable ResetIntegrationTest (#4446)
52191c3 is described below

commit 52191c34986431673580575264c5c0414ae20243
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Jan 24 13:24:23 2018 -0800

    KAFKA-6462: fix unstable ResetIntegrationTest (#4446)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>
---
 .../integration/AbstractResetIntegrationTest.java      | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 5819b6d..15d0332 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -167,11 +168,24 @@ public abstract class AbstractResetIntegrationTest {
     private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
 
     void prepareTest() throws Exception {
-        cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
-
         prepareConfigs();
         prepareEnvironment();
 
+        // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
+        while (true) {
+            Thread.sleep(50);
+
+            try {
+                TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                    "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+            } catch (final TimeoutException e) {
+                continue;
+            }
+            break;
+        }
+
+        cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
         add10InputElements();
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.