You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/29 22:17:12 UTC
kafka git commit: MINOR: Follow-up from KAFKA-3842 to fix tempDir
Repository: kafka
Updated Branches:
refs/heads/trunk cf03f349c -> 96ae0a592
MINOR: Follow-up from KAFKA-3842 to fix tempDir
\u2026p directories, waitForCondition
Author: bbejeck <bb...@gmail.com>
Reviewers: Guozhang Wang, Ismael Juma
Closes #1554 from bbejeck/follow_up_for_KAFKA-3842
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96ae0a59
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96ae0a59
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96ae0a59
Branch: refs/heads/trunk
Commit: 96ae0a5929bcd784dc632ddadf083bbedababbb6
Parents: cf03f34
Author: Bill Bejeck <bb...@gmail.com>
Authored: Wed Jun 29 15:17:04 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 29 15:17:04 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/test/TestUtils.java | 50 ++++++++++++--------
.../InternalTopicIntegrationTest.java | 2 +-
.../integration/JoinIntegrationTest.java | 2 +-
.../KGroupedStreamIntegrationTest.java | 2 +-
.../integration/KStreamRepartitionJoinTest.java | 2 +-
.../integration/RegexSourceIntegrationTest.java | 19 ++++----
.../integration/utils/IntegrationTestUtils.java | 17 ++-----
.../internals/ProcessorTopologyTest.java | 2 +-
.../streams/state/KeyValueStoreTestDriver.java | 2 +-
9 files changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 372954a..e06aece 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -123,20 +123,17 @@ public class TestUtils {
*
* @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
*/
- public static File tempDirectory(String prefix) throws IOException {
+ public static File tempDirectory(String prefix) {
return tempDirectory(null, prefix);
}
/**
- * Create a temporary directory named "test" under /temp
+ * Create a temporary relative directory in the default temporary-file directory with a
+ * prefix of "kafka-"
* @return the temporary directory just created.
*/
- public static File tempDir() {
- try {
- return tempDirectory(new File("/tmp").toPath(), "test");
- } catch (IOException ex) {
- throw new RuntimeException("Failed to create a temp dir", ex);
- }
+ public static File tempDirectory() {
+ return tempDirectory(null);
}
/**
@@ -145,10 +142,15 @@ public class TestUtils {
* @param parent The parent folder path name, if null using the default temporary-file directory
* @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
*/
- public static File tempDirectory(Path parent, String prefix) throws IOException {
- final File file = parent == null ?
- Files.createTempDirectory(prefix == null ? "kafka-" : prefix).toFile() :
- Files.createTempDirectory(parent, prefix == null ? "kafka-" : prefix).toFile();
+ public static File tempDirectory(Path parent, String prefix) {
+ final File file;
+ prefix = prefix == null ? "kafka-" : prefix;
+ try {
+ file = parent == null ?
+ Files.createTempDirectory(prefix).toFile() : Files.createTempDirectory(parent, prefix).toFile();
+ } catch (IOException ex) {
+ throw new RuntimeException("Failed to create a temp dir", ex);
+ }
file.deleteOnExit();
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -225,21 +227,29 @@ public class TestUtils {
}
/**
- * uses default value of 30 seconds for timeout
+ * uses default value of 15 seconds for timeout
*/
- public static void waitForCondition(TestCondition testCondition) throws InterruptedException {
- waitForCondition(testCondition, 30000);
+ public static void waitForCondition(TestCondition testCondition, String conditionDetails) throws InterruptedException {
+ waitForCondition(testCondition, 15000, conditionDetails);
}
/**
- * Used to wait for specific conditions/state to be me during a test
- * this is meant to be a replacement for using Thread.sleep
+ * Wait for condition to be met for at most {@code maxWaitMs} and throw assertion failure otherwise.
+ * This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used
+ * without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to
+ * avoid transient failures due to slow or overloaded machines.
*/
- public static void waitForCondition(TestCondition testCondition, long maxTimeMillis) throws InterruptedException {
+ public static void waitForCondition(TestCondition testCondition, long maxWaitMs, String conditionDetails) throws InterruptedException {
long startTime = System.currentTimeMillis();
- while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxTimeMillis)) {
- Thread.sleep(Math.min(maxTimeMillis, 100L));
+
+ while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxWaitMs)) {
+ Thread.sleep(Math.min(maxWaitMs, 100L));
+ }
+
+ if (!testCondition.conditionMet()) {
+ conditionDetails = conditionDetails != null ? conditionDetails : "";
+ throw new AssertionError("Condition not met within timeout " + maxWaitMs + ". " + conditionDetails);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 08406d1..15469c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -123,7 +123,7 @@ public class InternalTopicIntegrationTest {
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index bf01cbc..249e1ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -149,7 +149,7 @@ public class JoinIntegrationTest {
// with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
// accordingly.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
- TestUtils.tempDir().getPath());
+ TestUtils.tempDirectory().getPath());
// Remove any state from previous test runs
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
index b381251..36340b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
@@ -80,7 +80,7 @@ public class KGroupedStreamIntegrationTest {
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
KeyValueMapper<Integer, String, String>
mapper =
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 50ad84c..9aaafe6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -83,7 +83,7 @@ public class KStreamRepartitionJoinTest {
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 02f971e..b0a1e96 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -86,6 +86,7 @@ public class RegexSourceIntegrationTest {
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
private Properties streamsConfiguration;
+ private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
@BeforeClass
@@ -146,11 +147,12 @@ public class RegexSourceIntegrationTest {
streamThreads[0] = testStreamThread;
streams.start();
- TestUtils.waitForCondition(tasksUpdated);
+ TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
+ testStreamThread.streamTaskUpdated = false;
CLUSTER.createTopic("TEST-TOPIC-2");
- TestUtils.waitForCondition(tasksUpdated);
+ TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
streams.close();
@@ -193,11 +195,13 @@ public class RegexSourceIntegrationTest {
streams.start();
- TestUtils.waitForCondition(tasksUpdated);
+ TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
+ //reset
+ testStreamThread.streamTaskUpdated = false;
CLUSTER.deleteTopic("TEST-TOPIC-A");
- TestUtils.waitForCondition(tasksUpdated);
+ TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
streams.close();
@@ -331,12 +335,7 @@ public class RegexSourceIntegrationTest {
return new TestCondition() {
@Override
public boolean conditionMet() {
- if (testStreamThread.streamTaskUpdated) {
- testStreamThread.streamTaskUpdated = false;
- return true;
- } else {
- return false;
- }
+ return testStreamThread.streamTaskUpdated;
}
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 9d881e0..8ae55cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -200,13 +200,9 @@ public class IntegrationTestUtils {
}
};
- TestUtils.waitForCondition(valuesRead, waitTime);
+ String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
- if (accumData.size() < expectedNumRecords) {
- throw new AssertionError("Expected " + expectedNumRecords +
- " but received only " + accumData.size() +
- " records before timeout " + waitTime + " ms");
- }
+ TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
return accumData;
}
@@ -243,16 +239,11 @@ public class IntegrationTestUtils {
}
};
- TestUtils.waitForCondition(valuesRead, waitTime);
+ String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
- if (accumData.size() < expectedNumRecords) {
- throw new AssertionError("Expected " + expectedNumRecords +
- " but received only " + accumData.size() +
- " records before timeout " + waitTime + " ms");
- }
+ TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
return accumData;
-
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 78dfa7b..d08780b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -66,7 +66,7 @@ public class ProcessorTopologyTest {
@Before
public void setup() {
// Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
- File localState = TestUtils.tempDir();
+ File localState = TestUtils.tempDirectory();
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ae0a59/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index ab274f2..6d990b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -211,7 +211,7 @@ public class KeyValueStoreTestDriver<K, V> {
send(record, keySerializer, valueSerializer);
}
};
- this.stateDir = TestUtils.tempDir();
+ this.stateDir = TestUtils.tempDirectory();
this.stateDir.mkdirs();
Properties props = new Properties();