You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/24 21:24:27 UTC
[kafka] branch trunk updated: KAFKA-6462: fix unstable
ResetIntegrationTest (#4446)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 52191c3 KAFKA-6462: fix unstable ResetIntegrationTest (#4446)
52191c3 is described below
commit 52191c34986431673580575264c5c0414ae20243
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Jan 24 13:24:23 2018 -0800
KAFKA-6462: fix unstable ResetIntegrationTest (#4446)
Reviewers: Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>
---
.../integration/AbstractResetIntegrationTest.java | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 5819b6d..15d0332 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -167,11 +168,24 @@ public abstract class AbstractResetIntegrationTest {
private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
void prepareTest() throws Exception {
- cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
-
prepareConfigs();
prepareEnvironment();
+ // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
+ while (true) {
+ Thread.sleep(50);
+
+ try {
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ } catch (final TimeoutException e) {
+ continue;
+ }
+ break;
+ }
+
+ cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
add10InputElements();
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.