You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/02 11:57:19 UTC

kafka git commit: HOTFIX: Start embedded kafka in KafkaStreamsTest to avoid hanging

Repository: kafka
Updated Branches:
  refs/heads/0.10.0 f2405a73e -> ce34614a4


HOTFIX: Start embedded kafka in KafkaStreamsTest to avoid hanging

The KafkaStreamsTest can occasionally hang if the test doesn't run fast enough. This is due to there being no brokers available on the broker.urls provided to the StreamsConfig. The KafkaConsumer does a poll and blocks causing the test to never complete.

Author: Damian Guy <da...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #1693 from dguy/kafka-streams-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ce34614a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ce34614a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ce34614a

Branch: refs/heads/0.10.0
Commit: ce34614a43fb1f43ef6b5660fb37f7a0598d177a
Parents: f2405a7
Author: Damian Guy <da...@gmail.com>
Authored: Tue Aug 2 12:41:20 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Aug 2 12:41:20 2016 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/KafkaStreamsTest.java  | 21 +++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ce34614a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index af7e681..f8293b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.io.File;
@@ -31,11 +33,16 @@ import static org.junit.Assert.assertTrue;
 
 public class KafkaStreamsTest {
 
+    // We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete
+    // quick enough
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+
     @Test
     public void testStartAndClose() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
@@ -58,7 +65,7 @@ public class KafkaStreamsTest {
     public void testCloseIsIdempotent() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
         final KStreamBuilder builder = new KStreamBuilder();
@@ -75,7 +82,7 @@ public class KafkaStreamsTest {
     public void testCannotStartOnceClosed() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -95,7 +102,7 @@ public class KafkaStreamsTest {
     public void testCannotStartTwice() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -115,7 +122,7 @@ public class KafkaStreamsTest {
     public void testCleanup() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -137,7 +144,7 @@ public class KafkaStreamsTest {
         final File stateDirApp2 = new File(stateDir + File.separator + appId2);
 
         final Properties props = new Properties();
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
 
         assertFalse(stateDirApp1.exists());
@@ -164,7 +171,7 @@ public class KafkaStreamsTest {
     public void testCannotCleanupWhileRunning() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);