You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/15 19:40:42 UTC
kafka git commit: KAFKA-6215: fix transient failures in
KafkaStreamsTest
Repository: kafka
Updated Branches:
refs/heads/trunk f300480f8 -> 1756a22f7
KAFKA-6215: fix transient failures in KafkaStreamsTest
- set streams state.dir to test-dir (default /tmp is not reliable)
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Ted Yu <yu...@gmail.com>
Closes #4221 from mjsax/minor-fix-instable-tests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1756a22f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1756a22f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1756a22f
Branch: refs/heads/trunk
Commit: 1756a22f7923766175ee4f90bed074cf1b60f932
Parents: f300480
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 15 11:40:39 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 15 11:40:39 2017 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/KafkaStreamsTest.java | 46 ++------------------
1 file changed, 4 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1756a22f/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 69b4584..dd3b9af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
@@ -300,7 +299,9 @@ public class KafkaStreamsTest {
@Test
public void testNumberDefaultMetrics() {
- final KafkaStreams streams = createKafkaStreams();
+ props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final Map<MetricName, ? extends Metric> metrics = streams.metrics();
// all 22 default StreamThread metrics + 1 metric that keeps track of number of metrics
assertEquals(23, metrics.size());
@@ -308,9 +309,6 @@ public class KafkaStreamsTest {
@Test
public void testIllegalMetricsConfig() {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
final StreamsBuilder builder = new StreamsBuilder();
@@ -322,9 +320,6 @@ public class KafkaStreamsTest {
@Test
public void testLegalMetricsConfig() {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
final StreamsBuilder builder1 = new StreamsBuilder();
final KafkaStreams streams1 = new KafkaStreams(builder1.build(), props);
@@ -364,11 +359,6 @@ public class KafkaStreamsTest {
public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
final AtomicBoolean keepRunning = new AtomicBoolean(true);
try {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
final StreamsBuilder builder = new StreamsBuilder();
final CountDownLatch latch = new CountDownLatch(1);
final String topic = "input";
@@ -409,11 +399,7 @@ public class KafkaStreamsTest {
@Test
public void shouldReturnThreadMetadata() {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
- props.setProperty(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
@@ -427,22 +413,8 @@ public class KafkaStreamsTest {
streams.close();
}
-
- private KafkaStreams createKafkaStreams() {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
- final StreamsBuilder builder = new StreamsBuilder();
- return new KafkaStreams(builder.build(), props);
- }
-
@Test
public void testCleanup() {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
final StreamsBuilder builder = new StreamsBuilder();
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
@@ -454,10 +426,6 @@ public class KafkaStreamsTest {
@Test
public void testCannotCleanupWhileRunning() throws InterruptedException {
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
final StreamsBuilder builder = new StreamsBuilder();
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
@@ -492,13 +460,7 @@ public class KafkaStreamsTest {
@Test
public void shouldCleanupOldStateDirs() throws InterruptedException {
- final Properties props = new Properties();
- final String appId = "cleanupOldStateDirs";
- final String stateDir = TestUtils.tempDirectory().getPath();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId);
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
final String topic = "topic";
@@ -518,7 +480,7 @@ public class KafkaStreamsTest {
}
}
});
- final String appDir = stateDir + File.separator + appId;
+ final String appDir = props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
final File oldTaskDir = new File(appDir, "10_1");
assertTrue(oldTaskDir.mkdirs());
try {