You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/11/22 10:55:53 UTC
kafka git commit: MINOR: improve flaky Streams tests
Repository: kafka
Updated Branches:
refs/heads/0.11.0 8ea4a2826 -> c89c6b873
MINOR: improve flaky Streams tests
Use TestUtil test directory for state directory instead of default /tmp/kafka-streams
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>
Closes #4246 from mjsax/improve-flaky-streams-tests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c89c6b87
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c89c6b87
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c89c6b87
Branch: refs/heads/0.11.0
Commit: c89c6b87365c8a8482e1ddac23079af7f9faff0c
Parents: 8ea4a28
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 22 10:55:42 2017 +0000
Committer: Damian Guy <da...@gmail.com>
Committed: Wed Nov 22 10:55:42 2017 +0000
----------------------------------------------------------------------
.../apache/kafka/streams/KafkaStreamsTest.java | 81 ++++----------------
.../integration/FanoutIntegrationTest.java | 2 +
2 files changed, 16 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c89c6b87/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 8eea60c..064d7b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
@@ -63,6 +62,7 @@ public class KafkaStreamsTest {
// quick enough)
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
private final KStreamBuilder builder = new KStreamBuilder();
private KafkaStreams streams;
private Properties props;
@@ -80,9 +80,6 @@ public class KafkaStreamsTest {
@Test
public void testStateChanges() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
- final KafkaStreams streams = new KafkaStreams(builder, props);
-
StateListenerStub stateListener = new StateListenerStub();
streams.setStateListener(stateListener);
Assert.assertEquals(streams.state(), KafkaStreams.State.CREATED);
@@ -101,9 +98,6 @@ public class KafkaStreamsTest {
@Test
public void testStateCloseAfterCreate() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
- final KafkaStreams streams = new KafkaStreams(builder, props);
-
StateListenerStub stateListener = new StateListenerStub();
streams.setStateListener(stateListener);
streams.close();
@@ -159,25 +153,20 @@ public class KafkaStreamsTest {
@Test
public void testStateThreadClose() throws Exception {
- final int numThreads = 2;
- final KStreamBuilder builder = new KStreamBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
- props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
- final KafkaStreams streams = new KafkaStreams(builder, props);
- testStateThreadCloseHelper(numThreads);
+ streams = new KafkaStreams(new KStreamBuilder(), props);
+
+ testStateThreadCloseHelper(NUM_THREADS);
}
@Test
public void testStateGlobalThreadClose() throws Exception {
- final int numThreads = 2;
final KStreamBuilder builder = new KStreamBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
- props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
- final KafkaStreams streams = new KafkaStreams(builder, props);
-
+ streams = new KafkaStreams(builder, props);
streams.start();
TestUtils.waitForCondition(new TestCondition() {
@@ -260,7 +249,8 @@ public class KafkaStreamsTest {
@Test
public void testNumberDefaultMetrics() {
- final KafkaStreams streams = createKafkaStreams();
+ props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+ streams = new KafkaStreams(builder, props);
final Map<MetricName, ? extends Metric> metrics = streams.metrics();
// all 15 default StreamThread metrics + 1 metric that keeps track of number of metrics
assertEquals(metrics.size(), 16);
@@ -268,11 +258,7 @@ public class KafkaStreamsTest {
@Test
public void testIllegalMetricsConfig() {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
- final KStreamBuilder builder = new KStreamBuilder();
try {
new KafkaStreams(builder, props);
@@ -282,17 +268,12 @@ public class KafkaStreamsTest {
@Test
public void testLegalMetricsConfig() {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
- final KStreamBuilder builder1 = new KStreamBuilder();
- final KafkaStreams streams1 = new KafkaStreams(builder1, props);
+ final KafkaStreams streams1 = new KafkaStreams(builder, props);
streams1.close();
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString());
- final KStreamBuilder builder2 = new KStreamBuilder();
- final KafkaStreams streams2 = new KafkaStreams(builder2, props);
+ new KafkaStreams(builder, props);
}
@@ -325,11 +306,6 @@ public class KafkaStreamsTest {
public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
final AtomicBoolean keepRunning = new AtomicBoolean(true);
try {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
final KStreamBuilder builder = new KStreamBuilder();
final CountDownLatch latch = new CountDownLatch(1);
final String topic = "input";
@@ -349,7 +325,8 @@ public class KafkaStreamsTest {
}
}
});
- final KafkaStreams streams = new KafkaStreams(builder, props);
+
+ streams = new KafkaStreams(builder, props);
streams.start();
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
Collections.singletonList(new KeyValue<>("A", "A")),
@@ -369,24 +346,8 @@ public class KafkaStreamsTest {
}
- private KafkaStreams createKafkaStreams() {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
- final KStreamBuilder builder = new KStreamBuilder();
- return new KafkaStreams(builder, props);
- }
-
@Test
public void testCleanup() throws Exception {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
- final KStreamBuilder builder = new KStreamBuilder();
- final KafkaStreams streams = new KafkaStreams(builder, props);
-
streams.cleanUp();
streams.start();
streams.close();
@@ -395,13 +356,6 @@ public class KafkaStreamsTest {
@Test
public void testCannotCleanupWhileRunning() throws Exception {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
- final KStreamBuilder builder = new KStreamBuilder();
- final KafkaStreams streams = new KafkaStreams(builder, props);
-
streams.start();
try {
streams.cleanUp();
@@ -428,22 +382,15 @@ public class KafkaStreamsTest {
@Test
public void shouldCleanupOldStateDirs() throws InterruptedException {
- final Properties props = new Properties();
- final String appId = "cleanupOldStateDirs";
- final String stateDir = TestUtils.tempDirectory().getPath();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId);
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
-
final String topic = "topic";
CLUSTER.createTopic(topic);
- final KStreamBuilder builder = new KStreamBuilder();
+ final KStreamBuilder builder = new KStreamBuilder();
builder.table(Serdes.String(), Serdes.String(), topic, topic);
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
streams.setStateListener(new KafkaStreams.StateListener() {
@Override
@@ -453,7 +400,7 @@ public class KafkaStreamsTest {
}
}
});
- final String appDir = stateDir + File.separator + appId;
+ final String appDir = props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
final File oldTaskDir = new File(appDir, "10_1");
assertTrue(oldTaskDir.mkdirs());
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c89c6b87/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index f1f09a4..5f3be85 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -101,6 +102,7 @@ public class FanoutIntegrationTest {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");