You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/24 20:42:57 UTC
kafka git commit: KAFKA-3842: consolidate utility methods to TestUtils
Repository: kafka
Updated Branches:
refs/heads/trunk ef42c224a -> 88924b03d
KAFKA-3842: consolidate utility methods to TestUtils
\u2026stUtils, added method for pausing tests to TestUtils
Changes made:
1. Added utility method for creating consumer configs.
2. Added methods for creating producer, consumer configs with default values for de/serializers.
3. Pulled out method for waiting for test state to TestUtils (not using Thread.sleep).
4. Added utility class for creating streams configs and methods providing default de/serializers.
Author: bbejeck <bb...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1532 from bbejeck/KAFKA_3842_add_helper_functions_test_utils
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/88924b03
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/88924b03
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/88924b03
Branch: refs/heads/trunk
Commit: 88924b03d511132212ee5d8dec02063deb8313f1
Parents: ef42c22
Author: Bill Bejeck <bb...@gmail.com>
Authored: Fri Jun 24 13:42:53 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jun 24 13:42:53 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/test/TestCondition.java | 26 ++++++
.../java/org/apache/kafka/test/TestUtils.java | 65 ++++++++++++++
.../InternalTopicIntegrationTest.java | 4 +-
.../integration/JoinIntegrationTest.java | 4 +-
.../KGroupedStreamIntegrationTest.java | 3 +-
.../integration/KStreamRepartitionJoinTest.java | 3 +-
.../integration/RegexSourceIntegrationTest.java | 92 ++++++++------------
.../integration/utils/IntegrationTestUtils.java | 69 +++++++++------
.../internals/ProcessorTopologyTest.java | 4 +-
.../streams/state/KeyValueStoreTestDriver.java | 3 +-
.../kafka/streams/state/StateTestUtils.java | 79 -----------------
.../org/apache/kafka/test/StreamsTestUtils.java | 56 ++++++++++++
12 files changed, 236 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/clients/src/test/java/org/apache/kafka/test/TestCondition.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestCondition.java b/clients/src/test/java/org/apache/kafka/test/TestCondition.java
new file mode 100644
index 0000000..f78c91b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/TestCondition.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package org.apache.kafka.test;
+
+/**
+ * Interface to wrap actions that are required to wait until a condition is met
+ * for testing purposes. Note that this is not intended to do any assertions.
+ */
+public interface TestCondition {
+
+ boolean conditionMet();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index a818d53..372954a 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -29,7 +29,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.UUID;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
@@ -126,6 +128,18 @@ public class TestUtils {
}
/**
+ * Create a temporary directory named "test" under /temp
+ * @return the temporary directory just created.
+ */
+ public static File tempDir() {
+ try {
+ return tempDirectory(new File("/tmp").toPath(), "test");
+ } catch (IOException ex) {
+ throw new RuntimeException("Failed to create a temp dir", ex);
+ }
+ }
+
+ /**
* Create a temporary relative directory in the specified parent directory with the given prefix.
*
* @param parent The parent folder path name, if null using the default temporary-file directory
@@ -178,4 +192,55 @@ public class TestUtils {
properties.putAll(additional);
return properties;
}
+
+ public static Properties producerConfig(final String bootstrapServers, Class keySerializer, Class valueSerializer) {
+ return producerConfig(bootstrapServers, keySerializer, valueSerializer, new Properties());
+ }
+
+ public static Properties consumerConfig(final String bootstrapServers,
+ final String groupId,
+ final Class keyDeserializer,
+ final Class valueDeserializer,
+ final Properties additional) {
+
+ final Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
+ consumerConfig.putAll(additional);
+ return consumerConfig;
+ }
+
+ /**
+ * returns consumer config with random UUID for the Group ID
+ */
+ public static Properties consumerConfig(final String bootstrapServers, Class keyDeserializer, Class valueDeserializer) {
+ return consumerConfig(bootstrapServers,
+ UUID.randomUUID().toString(),
+ keyDeserializer,
+ valueDeserializer,
+ new Properties());
+ }
+
+ /**
+ * uses default value of 30 seconds for timeout
+ */
+ public static void waitForCondition(TestCondition testCondition) throws InterruptedException {
+ waitForCondition(testCondition, 30000);
+ }
+
+ /**
+ * Used to wait for specific conditions/state to be me during a test
+ * this is meant to be a replacement for using Thread.sleep
+ */
+ public static void waitForCondition(TestCondition testCondition, long maxTimeMillis) throws InterruptedException {
+ long startTime = System.currentTimeMillis();
+
+ while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxTimeMillis)) {
+ Thread.sleep(Math.min(maxTimeMillis, 100L));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index addebae..08406d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.StateTestUtils;
+import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -123,7 +123,7 @@ public class InternalTopicIntegrationTest {
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index f251a85..bf01cbc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.state.StateTestUtils;
+import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -149,7 +149,7 @@ public class JoinIntegrationTest {
// with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
// accordingly.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
- StateTestUtils.tempDir().getPath());
+ TestUtils.tempDir().getPath());
// Remove any state from previous test runs
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
index 1ec6573..b381251 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.state.StateTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -81,7 +80,7 @@ public class KGroupedStreamIntegrationTest {
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
KeyValueMapper<Integer, String, String>
mapper =
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index c852513..7dabc33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.state.StateTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -81,7 +80,7 @@ public class KStreamRepartitionJoinTest {
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index cf48391..02f971e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -16,8 +16,6 @@
package org.apache.kafka.streams.integration;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
@@ -39,6 +37,9 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -59,6 +60,7 @@ import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
/**
* End-to-end integration test based on using regex and named topics for creating sources, using
@@ -82,6 +84,7 @@ public class RegexSourceIntegrationTest {
private static final int SECOND_UPDATE = 1;
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+ private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
private Properties streamsConfiguration;
@@ -100,7 +103,10 @@ public class RegexSourceIntegrationTest {
@Before
public void setUp() {
- streamsConfiguration = getStreamsConfig();
+
+ streamsConfiguration = StreamsTestUtils.getStreamsConfig(CLUSTER.bootstrapServers(),
+ STRING_SERDE_CLASSNAME,
+ STRING_SERDE_CLASSNAME);
}
@After
@@ -135,13 +141,16 @@ public class RegexSourceIntegrationTest {
new DefaultKafkaClientSupplier(),
originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
+ TestCondition tasksUpdated = createTasksUpdatedCondition(testStreamThread);
+
streamThreads[0] = testStreamThread;
streams.start();
- testStreamThread.waitUntilTasksUpdated();
+
+ TestUtils.waitForCondition(tasksUpdated);
CLUSTER.createTopic("TEST-TOPIC-2");
- testStreamThread.waitUntilTasksUpdated();
+ TestUtils.waitForCondition(tasksUpdated);
streams.close();
@@ -180,13 +189,15 @@ public class RegexSourceIntegrationTest {
originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
streamThreads[0] = testStreamThread;
+ TestCondition tasksUpdated = createTasksUpdatedCondition(testStreamThread);
+
streams.start();
- testStreamThread.waitUntilTasksUpdated();
+ TestUtils.waitForCondition(tasksUpdated);
CLUSTER.deleteTopic("TEST-TOPIC-A");
- testStreamThread.waitUntilTasksUpdated();
+ TestUtils.waitForCondition(tasksUpdated);
streams.close();
@@ -224,7 +235,7 @@ public class RegexSourceIntegrationTest {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
- Properties producerConfig = getProducerConfig();
+ Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig);
@@ -233,7 +244,7 @@ public class RegexSourceIntegrationTest {
IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig);
- Properties consumerConfig = getConsumerConfig();
+ Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
@@ -272,61 +283,25 @@ public class RegexSourceIntegrationTest {
pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
-
- // Remove any state from previous test runs
- IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
- Properties producerConfig = getProducerConfig();
+ Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig);
IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig);
- Properties consumerConfig = getConsumerConfig();
+ Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
try {
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
+ fail("Should not get here");
} finally {
streams.close();
}
}
- private Properties getProducerConfig() {
- Properties producerConfig = new Properties();
- producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
- producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
- producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return producerConfig;
- }
-
- private Properties getStreamsConfig() {
- Properties streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "regex-source-integration-test");
- streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
- streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
- streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- return streamsConfiguration;
- }
-
- private Properties getConsumerConfig() {
- Properties consumerConfig = new Properties();
- consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "regex-source-integration-consumer");
- consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
- return consumerConfig;
- }
-
private class TestStreamThread extends StreamThread {
public Map<Integer, List<String>> assignedTopicPartitions = new HashMap<>();
@@ -349,16 +324,21 @@ public class RegexSourceIntegrationTest {
return super.createStreamTask(id, partitions);
}
+ }
- void waitUntilTasksUpdated() {
- long maxTimeMillis = 30000;
- long startTime = System.currentTimeMillis();
- while (!streamTaskUpdated && ((System.currentTimeMillis() - startTime) < maxTimeMillis)) {
- //empty loop just waiting for update
- }
- streamTaskUpdated = false;
- }
+ private TestCondition createTasksUpdatedCondition(final TestStreamThread testStreamThread) {
+ return new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ if (testStreamThread.streamTaskUpdated) {
+ testStreamThread.streamTaskUpdated = false;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 83b431c..9d881e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -27,6 +27,8 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
import java.io.File;
import java.io.IOException;
@@ -183,23 +185,30 @@ public class IntegrationTestUtils {
* @throws InterruptedException
* @throws AssertionError if the given wait time elapses
*/
- public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
- String topic,
- int expectedNumRecords,
+ public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
+ final String topic,
+ final int expectedNumRecords,
long waitTime) throws InterruptedException {
- List<KeyValue<K, V>> accumData = new ArrayList<>();
- long startTime = System.currentTimeMillis();
- while (true) {
- List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
- accumData.addAll(readData);
- if (accumData.size() >= expectedNumRecords)
- return accumData;
- if (System.currentTimeMillis() > startTime + waitTime)
- throw new AssertionError("Expected " + expectedNumRecords +
+ final List<KeyValue<K, V>> accumData = new ArrayList<>();
+
+ TestCondition valuesRead = new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
+ }
+ };
+
+ TestUtils.waitForCondition(valuesRead, waitTime);
+
+ if (accumData.size() < expectedNumRecords) {
+ throw new AssertionError("Expected " + expectedNumRecords +
" but received only " + accumData.size() +
" records before timeout " + waitTime + " ms");
- Thread.sleep(Math.min(waitTime, 100L));
}
+
+ return accumData;
}
public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig,
@@ -219,23 +228,31 @@ public class IntegrationTestUtils {
* @throws InterruptedException
* @throws AssertionError if the given wait time elapses
*/
- public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig,
- String topic,
- int expectedNumRecords,
+ public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
+ final String topic,
+ final int expectedNumRecords,
long waitTime) throws InterruptedException {
- List<V> accumData = new ArrayList<>();
- long startTime = System.currentTimeMillis();
- while (true) {
- List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
- accumData.addAll(readData);
- if (accumData.size() >= expectedNumRecords)
- return accumData;
- if (System.currentTimeMillis() > startTime + waitTime)
- throw new AssertionError("Expected " + expectedNumRecords +
+ final List<V> accumData = new ArrayList<>();
+
+ TestCondition valuesRead = new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
+ }
+ };
+
+ TestUtils.waitForCondition(valuesRead, waitTime);
+
+ if (accumData.size() < expectedNumRecords) {
+ throw new AssertionError("Expected " + expectedNumRecords +
" but received only " + accumData.size() +
" records before timeout " + waitTime + " ms");
- Thread.sleep(Math.min(waitTime, 100L));
}
+
+ return accumData;
+
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 62b283a..78dfa7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -38,10 +38,10 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateTestUtils;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -66,7 +66,7 @@ public class ProcessorTopologyTest {
@Before
public void setup() {
// Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
- File localState = StateTestUtils.tempDir();
+ File localState = TestUtils.tempDir();
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index be5596d..ab274f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
import java.io.File;
import java.util.HashMap;
@@ -210,7 +211,7 @@ public class KeyValueStoreTestDriver<K, V> {
send(record, keySerializer, valueSerializer);
}
};
- this.stateDir = StateTestUtils.tempDir();
+ this.stateDir = TestUtils.tempDir();
this.stateDir.mkdirs();
Properties props = new Properties();
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
deleted file mode 100644
index f348fc9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A utility for tests to create and manage unique and isolated directories on the file system for local state.
- */
-public class StateTestUtils {
-
- private static final AtomicLong INSTANCE_COUNTER = new AtomicLong();
-
- /**
- * Create a new temporary directory that will be cleaned up automatically upon shutdown.
- * @return the new directory that will exist; never null
- */
- public static File tempDir() {
- try {
- final File dir = Files.createTempDirectory(new File("/tmp").toPath(), "test").toFile();
- dir.mkdirs();
- dir.deleteOnExit();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- deleteDirectory(dir);
- }
- });
- return dir;
- } catch (IOException ex) {
- throw new RuntimeException("Failed to create a temp dir", ex);
- }
- }
-
- private static void deleteDirectory(File dir) {
- if (dir != null && dir.exists()) {
- try {
- Files.walkFileTree(dir.toPath(), new SimpleFileVisitor<Path>() {
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
- Files.delete(file);
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
- Files.delete(dir);
- return FileVisitResult.CONTINUE;
- }
-
- });
- } catch (IOException e) {
- // do nothing
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
new file mode 100644
index 0000000..5a7bfa7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.test;
+
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.Properties;
+import java.util.UUID;
+
+public class StreamsTestUtils {
+
+ public static Properties getStreamsConfig(final String applicationId,
+ final String bootstrapServers,
+ final String keySerdeClassName,
+ final String valueSerdeClassName,
+ final Properties additional) {
+
+ Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+ streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, keySerdeClassName);
+ streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/" + applicationId);
+ streamsConfiguration.putAll(additional);
+ return streamsConfiguration;
+
+ }
+
+ /**
+ * Streams configuration with a random generated UUID for the application id
+ */
+ public static Properties getStreamsConfig(String bootstrapServer, String keySerdeClassName, String valueSerdeClassName) {
+ return getStreamsConfig(UUID.randomUUID().toString(),
+ bootstrapServer,
+ keySerdeClassName,
+ valueSerdeClassName,
+ new Properties());
+ }
+
+}