You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/29 22:17:12 UTC

kafka git commit: MINOR: Follow-up from KAFKA-3842 to fix tempDir

Repository: kafka
Updated Branches:
  refs/heads/trunk cf03f349c -> 96ae0a592


MINOR: Follow-up from KAFKA-3842 to fix tempDir

\u2026p directories, waitForCondition

Author: bbejeck <bb...@gmail.com>

Reviewers: Guozhang Wang, Ismael Juma

Closes #1554 from bbejeck/follow_up_for_KAFKA-3842


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96ae0a59
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96ae0a59
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96ae0a59

Branch: refs/heads/trunk
Commit: 96ae0a5929bcd784dc632ddadf083bbedababbb6
Parents: cf03f34
Author: Bill Bejeck <bb...@gmail.com>
Authored: Wed Jun 29 15:17:04 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 29 15:17:04 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/test/TestUtils.java   | 50 ++++++++++++--------
 .../InternalTopicIntegrationTest.java           |  2 +-
 .../integration/JoinIntegrationTest.java        |  2 +-
 .../KGroupedStreamIntegrationTest.java          |  2 +-
 .../integration/KStreamRepartitionJoinTest.java |  2 +-
 .../integration/RegexSourceIntegrationTest.java | 19 ++++----
 .../integration/utils/IntegrationTestUtils.java | 17 ++-----
 .../internals/ProcessorTopologyTest.java        |  2 +-
 .../streams/state/KeyValueStoreTestDriver.java  |  2 +-
 9 files changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 372954a..e06aece 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -123,20 +123,17 @@ public class TestUtils {
      *
      * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
      */
-    public static File tempDirectory(String prefix) throws IOException {
+    public static File tempDirectory(String prefix) {
         return tempDirectory(null, prefix);
     }
 
     /**
-     * Create a temporary directory named "test" under /temp
+     * Create a temporary relative directory in the default temporary-file directory with a
+     * prefix of "kafka-"
      * @return  the temporary directory just created.
      */
-    public static File tempDir() {
-        try {
-            return tempDirectory(new File("/tmp").toPath(), "test");
-        } catch (IOException ex) {
-            throw new RuntimeException("Failed to create a temp dir", ex);
-        }
+    public static File tempDirectory() {
+        return tempDirectory(null);
     }
 
     /**
@@ -145,10 +142,15 @@ public class TestUtils {
      * @param parent The parent folder path name, if null using the default temporary-file directory
      * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
      */
-    public static File tempDirectory(Path parent, String prefix) throws IOException {
-        final File file = parent == null ?
-                Files.createTempDirectory(prefix == null ? "kafka-" : prefix).toFile() :
-                Files.createTempDirectory(parent, prefix == null ? "kafka-" : prefix).toFile();
+    public static File tempDirectory(Path parent, String prefix) {
+        final File file;
+        prefix = prefix == null ? "kafka-" : prefix;
+        try {
+            file = parent == null ?
+                    Files.createTempDirectory(prefix).toFile() : Files.createTempDirectory(parent, prefix).toFile();
+        } catch (IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
         file.deleteOnExit();
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -225,21 +227,29 @@ public class TestUtils {
     }
 
     /**
-     *  uses default value of 30 seconds for timeout
+     *  uses default value of 15 seconds for timeout
      */
-    public static void waitForCondition(TestCondition testCondition) throws InterruptedException {
-        waitForCondition(testCondition, 30000);
+    public static void waitForCondition(TestCondition testCondition, String conditionDetails) throws InterruptedException {
+        waitForCondition(testCondition, 15000, conditionDetails);
     }
 
     /**
-     *  Used to wait for specific conditions/state to be me during a test
-     *  this is meant to be a replacement for using Thread.sleep
+     * Wait for condition to be met for at most {@code maxWaitMs} and throw assertion failure otherwise.
+     * This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used
+     * without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to
+     * avoid transient failures due to slow or overloaded machines.
      */
-    public static void waitForCondition(TestCondition testCondition, long maxTimeMillis) throws InterruptedException {
+    public static void waitForCondition(TestCondition testCondition, long maxWaitMs, String conditionDetails) throws InterruptedException {
         long startTime = System.currentTimeMillis();
 
-        while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxTimeMillis)) {
-            Thread.sleep(Math.min(maxTimeMillis, 100L));
+
+        while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxWaitMs)) {
+            Thread.sleep(Math.min(maxWaitMs, 100L));
+        }
+
+        if (!testCondition.conditionMet()) {
+            conditionDetails = conditionDetails != null ? conditionDetails : "";
+            throw new AssertionError("Condition not met within timeout " + maxWaitMs + ". " + conditionDetails);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 08406d1..15469c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -123,7 +123,7 @@ public class InternalTopicIntegrationTest {
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index bf01cbc..249e1ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -149,7 +149,7 @@ public class JoinIntegrationTest {
         // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
         // accordingly.
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
-                                 TestUtils.tempDir().getPath());
+                                 TestUtils.tempDirectory().getPath());
 
         // Remove any state from previous test runs
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
index b381251..36340b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
@@ -80,7 +80,7 @@ public class KGroupedStreamIntegrationTest {
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
 
         KeyValueMapper<Integer, String, String>
             mapper =

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 50ad84c..9aaafe6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -83,7 +83,7 @@ public class KStreamRepartitionJoinTest {
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 02f971e..b0a1e96 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -86,6 +86,7 @@ public class RegexSourceIntegrationTest {
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
     private Properties streamsConfiguration;
+    private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
 
 
     @BeforeClass
@@ -146,11 +147,12 @@ public class RegexSourceIntegrationTest {
         streamThreads[0] = testStreamThread;
         streams.start();
 
-        TestUtils.waitForCondition(tasksUpdated);
+        TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
+        testStreamThread.streamTaskUpdated = false;
 
         CLUSTER.createTopic("TEST-TOPIC-2");
 
-        TestUtils.waitForCondition(tasksUpdated);
+        TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
 
         streams.close();
 
@@ -193,11 +195,13 @@ public class RegexSourceIntegrationTest {
 
         streams.start();
 
-        TestUtils.waitForCondition(tasksUpdated);
+        TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
+        //reset
+        testStreamThread.streamTaskUpdated = false;
 
         CLUSTER.deleteTopic("TEST-TOPIC-A");
 
-        TestUtils.waitForCondition(tasksUpdated);
+        TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
 
         streams.close();
 
@@ -331,12 +335,7 @@ public class RegexSourceIntegrationTest {
         return new TestCondition() {
             @Override
             public boolean conditionMet() {
-                if (testStreamThread.streamTaskUpdated) {
-                    testStreamThread.streamTaskUpdated = false;
-                    return true;
-                } else {
-                    return false;
-                }
+                return testStreamThread.streamTaskUpdated;
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 9d881e0..8ae55cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -200,13 +200,9 @@ public class IntegrationTestUtils {
             }
         };
 
-        TestUtils.waitForCondition(valuesRead, waitTime);
+        String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
 
-        if (accumData.size() < expectedNumRecords) {
-            throw new AssertionError("Expected " + expectedNumRecords +
-                    " but received only " + accumData.size() +
-                    " records before timeout " + waitTime + " ms");
-        }
+        TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
 
         return accumData;
     }
@@ -243,16 +239,11 @@ public class IntegrationTestUtils {
             }
         };
 
-        TestUtils.waitForCondition(valuesRead, waitTime);
+        String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
 
-        if (accumData.size() < expectedNumRecords) {
-            throw new AssertionError("Expected " + expectedNumRecords +
-                    " but received only " + accumData.size() +
-                    " records before timeout " + waitTime + " ms");
-        }
+        TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
 
         return accumData;
-
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 78dfa7b..d08780b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -66,7 +66,7 @@ public class ProcessorTopologyTest {
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
-        File localState = TestUtils.tempDir();
+        File localState = TestUtils.tempDirectory();
         Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index ab274f2..6d990b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -211,7 +211,7 @@ public class KeyValueStoreTestDriver<K, V> {
                 send(record, keySerializer, valueSerializer);
             }
         };
-        this.stateDir = TestUtils.tempDir();
+        this.stateDir = TestUtils.tempDirectory();
         this.stateDir.mkdirs();
 
         Properties props = new Properties();