You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/15 19:40:42 UTC

kafka git commit: KAFKA-6215: fix transient failures in KafkaStreamsTest

Repository: kafka
Updated Branches:
  refs/heads/trunk f300480f8 -> 1756a22f7


KAFKA-6215: fix transient failures in KafkaStreamsTest

 - set streams state.dir to test-dir (default /tmp is not reliable)

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Ted Yu <yu...@gmail.com>

Closes #4221 from mjsax/minor-fix-instable-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1756a22f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1756a22f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1756a22f

Branch: refs/heads/trunk
Commit: 1756a22f7923766175ee4f90bed074cf1b60f932
Parents: f300480
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 15 11:40:39 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 15 11:40:39 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/KafkaStreamsTest.java  | 46 ++------------------
 1 file changed, 4 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1756a22f/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 69b4584..dd3b9af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.ConfigException;
@@ -300,7 +299,9 @@ public class KafkaStreamsTest {
 
     @Test
     public void testNumberDefaultMetrics() {
-        final KafkaStreams streams = createKafkaStreams();
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final Map<MetricName, ? extends Metric> metrics = streams.metrics();
         // all 22 default StreamThread metrics + 1 metric that keeps track of number of metrics
         assertEquals(23, metrics.size());
@@ -308,9 +309,6 @@ public class KafkaStreamsTest {
 
     @Test
     public void testIllegalMetricsConfig() {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -322,9 +320,6 @@ public class KafkaStreamsTest {
 
     @Test
     public void testLegalMetricsConfig() {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
         final StreamsBuilder builder1 = new StreamsBuilder();
         final KafkaStreams streams1 = new KafkaStreams(builder1.build(), props);
@@ -364,11 +359,6 @@ public class KafkaStreamsTest {
     public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
         final AtomicBoolean keepRunning = new AtomicBoolean(true);
         try {
-            final Properties props = new Properties();
-            props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-            props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-            props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
             final StreamsBuilder builder = new StreamsBuilder();
             final CountDownLatch latch = new CountDownLatch(1);
             final String topic = "input";
@@ -409,11 +399,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldReturnThreadMetadata() {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
-        props.setProperty(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();
         Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
@@ -427,22 +413,8 @@ public class KafkaStreamsTest {
         streams.close();
     }
 
-
-    private KafkaStreams createKafkaStreams() {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        return new KafkaStreams(builder.build(), props);
-    }
-
     @Test
     public void testCleanup() {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
         final StreamsBuilder builder = new StreamsBuilder();
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
@@ -454,10 +426,6 @@ public class KafkaStreamsTest {
 
     @Test
     public void testCannotCleanupWhileRunning() throws InterruptedException {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
         final StreamsBuilder builder = new StreamsBuilder();
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
@@ -492,13 +460,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldCleanupOldStateDirs() throws InterruptedException {
-        final Properties props = new Properties();
-        final String appId = "cleanupOldStateDirs";
-        final String stateDir = TestUtils.tempDirectory().getPath();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId);
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
 
 
         final String topic = "topic";
@@ -518,7 +480,7 @@ public class KafkaStreamsTest {
                 }
             }
         });
-        final String appDir = stateDir + File.separator + appId;
+        final String appDir = props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
         final File oldTaskDir = new File(appDir, "10_1");
         assertTrue(oldTaskDir.mkdirs());
         try {