You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/11/22 10:55:53 UTC

kafka git commit: MINOR: improve flaky Streams tests

Repository: kafka
Updated Branches:
  refs/heads/0.11.0 8ea4a2826 -> c89c6b873


MINOR: improve flaky Streams tests

Use TestUtil test directory for state directory instead of default /tmp/kafka-streams

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>

Closes #4246 from mjsax/improve-flaky-streams-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c89c6b87
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c89c6b87
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c89c6b87

Branch: refs/heads/0.11.0
Commit: c89c6b87365c8a8482e1ddac23079af7f9faff0c
Parents: 8ea4a28
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 22 10:55:42 2017 +0000
Committer: Damian Guy <da...@gmail.com>
Committed: Wed Nov 22 10:55:42 2017 +0000

----------------------------------------------------------------------
 .../apache/kafka/streams/KafkaStreamsTest.java  | 81 ++++----------------
 .../integration/FanoutIntegrationTest.java      |  2 +
 2 files changed, 16 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c89c6b87/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 8eea60c..064d7b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.ConfigException;
@@ -63,6 +62,7 @@ public class KafkaStreamsTest {
     // quick enough)
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
     private final KStreamBuilder builder = new KStreamBuilder();
     private KafkaStreams streams;
     private Properties props;
@@ -80,9 +80,6 @@ public class KafkaStreamsTest {
 
     @Test
     public void testStateChanges() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
-
         StateListenerStub stateListener = new StateListenerStub();
         streams.setStateListener(stateListener);
         Assert.assertEquals(streams.state(), KafkaStreams.State.CREATED);
@@ -101,9 +98,6 @@ public class KafkaStreamsTest {
 
     @Test
     public void testStateCloseAfterCreate() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
-
         StateListenerStub stateListener = new StateListenerStub();
         streams.setStateListener(stateListener);
         streams.close();
@@ -159,25 +153,20 @@ public class KafkaStreamsTest {
 
     @Test
     public void testStateThreadClose() throws Exception {
-        final int numThreads = 2;
-        final KStreamBuilder builder = new KStreamBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
-        final KafkaStreams streams = new KafkaStreams(builder, props);
 
-        testStateThreadCloseHelper(numThreads);
+        streams = new KafkaStreams(new KStreamBuilder(), props);
+
+        testStateThreadCloseHelper(NUM_THREADS);
     }
     
     @Test
     public void testStateGlobalThreadClose() throws Exception {
-        final int numThreads = 2;
         final KStreamBuilder builder = new KStreamBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
-        final KafkaStreams streams = new KafkaStreams(builder, props);
-
+        streams = new KafkaStreams(builder, props);
 
         streams.start();
         TestUtils.waitForCondition(new TestCondition() {
@@ -260,7 +249,8 @@ public class KafkaStreamsTest {
 
     @Test
     public void testNumberDefaultMetrics() {
-        final KafkaStreams streams = createKafkaStreams();
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        streams = new KafkaStreams(builder, props);
         final Map<MetricName, ? extends Metric> metrics = streams.metrics();
         // all 15 default StreamThread metrics + 1 metric that keeps track of number of metrics
         assertEquals(metrics.size(), 16);
@@ -268,11 +258,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void testIllegalMetricsConfig() {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
-        final KStreamBuilder builder = new KStreamBuilder();
 
         try {
             new KafkaStreams(builder, props);
@@ -282,17 +268,12 @@ public class KafkaStreamsTest {
 
     @Test
     public void testLegalMetricsConfig() {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
-        final KStreamBuilder builder1 = new KStreamBuilder();
-        final KafkaStreams streams1 = new KafkaStreams(builder1, props);
+        final KafkaStreams streams1 = new KafkaStreams(builder, props);
         streams1.close();
 
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString());
-        final KStreamBuilder builder2 = new KStreamBuilder();
-        final KafkaStreams streams2 = new KafkaStreams(builder2, props);
+        new KafkaStreams(builder, props);
 
     }
 
@@ -325,11 +306,6 @@ public class KafkaStreamsTest {
     public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
         final AtomicBoolean keepRunning = new AtomicBoolean(true);
         try {
-            final Properties props = new Properties();
-            props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-            props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-            props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
             final KStreamBuilder builder = new KStreamBuilder();
             final CountDownLatch latch = new CountDownLatch(1);
             final String topic = "input";
@@ -349,7 +325,8 @@ public class KafkaStreamsTest {
                             }
                         }
                     });
-            final KafkaStreams streams = new KafkaStreams(builder, props);
+
+            streams = new KafkaStreams(builder, props);
             streams.start();
             IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
                                                                             Collections.singletonList(new KeyValue<>("A", "A")),
@@ -369,24 +346,8 @@ public class KafkaStreamsTest {
     }
 
 
-    private KafkaStreams createKafkaStreams() {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
-        final KStreamBuilder builder = new KStreamBuilder();
-        return new KafkaStreams(builder, props);
-    }
-
     @Test
     public void testCleanup() throws Exception {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
-
         streams.cleanUp();
         streams.start();
         streams.close();
@@ -395,13 +356,6 @@ public class KafkaStreamsTest {
 
     @Test
     public void testCannotCleanupWhileRunning() throws Exception {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
-
         streams.start();
         try {
             streams.cleanUp();
@@ -428,22 +382,15 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldCleanupOldStateDirs() throws InterruptedException {
-        final Properties props = new Properties();
-        final String appId = "cleanupOldStateDirs";
-        final String stateDir = TestUtils.tempDirectory().getPath();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId);
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
-
 
         final String topic = "topic";
         CLUSTER.createTopic(topic);
-        final KStreamBuilder builder = new KStreamBuilder();
 
+        final KStreamBuilder builder = new KStreamBuilder();
         builder.table(Serdes.String(), Serdes.String(), topic, topic);
 
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        streams = new KafkaStreams(builder, props);
         final CountDownLatch latch = new CountDownLatch(1);
         streams.setStateListener(new KafkaStreams.StateListener() {
             @Override
@@ -453,7 +400,7 @@ public class KafkaStreamsTest {
                 }
             }
         });
-        final String appDir = stateDir + File.separator + appId;
+        final String appDir = props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
         final File oldTaskDir = new File(appDir, "10_1");
         assertTrue(oldTaskDir.mkdirs());
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c89c6b87/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index f1f09a4..5f3be85 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -101,6 +102,7 @@ public class FanoutIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");