You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/31 22:29:07 UTC
[4/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate
KStreamBuilder
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index e6c0d6e..ad0f236 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -149,7 +148,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(null, name, null, null, null, topics);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -173,7 +172,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topics);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -197,7 +196,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -224,7 +223,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topics);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -247,7 +246,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -272,7 +271,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topicPattern);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -298,7 +297,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -326,7 +325,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topicPattern);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -344,7 +343,7 @@ public class TopologyBuilder {
* if not specified the default value deserializer defined in the configs will be used
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
- * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
+ * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
public synchronized final TopologyBuilder addSource(final String name,
final Deserializer keyDeserializer,
@@ -353,7 +352,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -374,7 +373,7 @@ public class TopologyBuilder {
* if not specified the default value deserializer defined in the configs will be used
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
- * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
+ * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
@@ -385,7 +384,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topics);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -422,7 +421,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -461,7 +460,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -482,7 +481,7 @@ public class TopologyBuilder {
* if not specified the default value deserializer defined in the configs will be used
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
- * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
+ * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
*/
public synchronized final TopologyBuilder addSource(final String name,
final Deserializer keyDeserializer,
@@ -491,7 +490,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -515,7 +514,7 @@ public class TopologyBuilder {
* if not specified the default value deserializer defined in the configs will be used
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
- * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
+ * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
*/
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
@@ -526,7 +525,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -548,7 +547,7 @@ public class TopologyBuilder {
* if not specified the default value deserializer defined in the configs will be used
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
- * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
+ * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
*/
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
@@ -558,7 +557,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, keyDeserializer, valDeserializer, topicPattern);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -584,7 +583,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -619,7 +618,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -651,7 +650,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -675,7 +674,7 @@ public class TopologyBuilder {
* @see #addSink(String, String, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
- * @throws TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
+ * @throws org.apache.kafka.streams.errors.TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
*/
public synchronized final <K, V> TopologyBuilder addSink(final String name,
final String topic,
@@ -686,7 +685,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -699,7 +698,7 @@ public class TopologyBuilder {
* @param predecessorNames the name of one or more source or processor nodes whose output records this processor should receive
* and process
* @return this builder instance so methods can be chained together; never null
- * @throws TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
+ * @throws org.apache.kafka.streams.errors.TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
*/
public synchronized final TopologyBuilder addProcessor(final String name,
final ProcessorSupplier supplier,
@@ -707,7 +706,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -717,14 +716,14 @@ public class TopologyBuilder {
*
* @param supplier the supplier used to obtain this state store {@link StateStore} instance
* @return this builder instance so methods can be chained together; never null
- * @throws TopologyBuilderException if state store supplier is already added
+ * @throws org.apache.kafka.streams.errors.TopologyBuilderException if state store supplier is already added
*/
public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
final String... processorNames) {
try {
internalTopologyBuilder.addStateStore(supplier, processorNames);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
return this;
}
@@ -742,7 +741,7 @@ public class TopologyBuilder {
try {
internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
} catch (final TopologyException e) {
- throw new TopologyBuilderException(e);
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
}
return this;
@@ -769,7 +768,7 @@ public class TopologyBuilder {
*
* @param processorNames the name of the processors
* @return this builder instance so methods can be chained together; never null
- * @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
+ * @throws org.apache.kafka.streams.errors.TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
*/
public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
internalTopologyBuilder.connectProcessors(processorNames);
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index f9ae216..ebdd64d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskAssignmentException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
@@ -661,7 +660,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
continue;
}
if (numPartitions < 0) {
- throw new TopologyBuilderException(String.format("%s Topic [%s] number of partitions not defined", logPrefix, topic.name()));
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topic [%s] number of partitions not defined", logPrefix, topic.name()));
}
topicsToMakeReady.put(topic, numPartitions);
@@ -786,7 +785,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
final Integer partitions = metadata.partitionCountForTopic(topic);
if (partitions == null) {
- throw new TopologyBuilderException(String.format("%s Topic not found: %s", logPrefix, topic));
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topic not found: %s", logPrefix, topic));
}
if (numPartitions == UNKNOWN) {
@@ -794,7 +793,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
} else if (numPartitions != partitions) {
final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
Arrays.sort(topics);
- throw new TopologyBuilderException(String.format("%s Topics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
}
} else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
numPartitions = NOT_AVAILABLE;
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index a6532df..c6accde 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -64,7 +63,7 @@ public class KafkaStreamsTest {
// quick enough)
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- private final KStreamBuilder builder = new KStreamBuilder();
+ private final StreamsBuilder builder = new StreamsBuilder();
private KafkaStreams streams;
private Properties props;
@@ -76,13 +75,13 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
- streams = new KafkaStreams(builder, props);
+ streams = new KafkaStreams(builder.build(), props);
}
@Test
public void testStateChanges() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final StateListenerStub stateListener = new StateListenerStub();
streams.setStateListener(stateListener);
@@ -102,8 +101,8 @@ public class KafkaStreamsTest {
@Test
public void testStateCloseAfterCreate() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final StateListenerStub stateListener = new StateListenerStub();
streams.setStateListener(stateListener);
@@ -114,11 +113,11 @@ public class KafkaStreamsTest {
@Test
public void testStateThreadClose() throws Exception {
final int numThreads = 2;
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
@@ -171,11 +170,11 @@ public class KafkaStreamsTest {
@Test
public void testStateGlobalThreadClose() throws Exception {
final int numThreads = 2;
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
@@ -207,8 +206,8 @@ public class KafkaStreamsTest {
@Test
public void testInitializesAndDestroysMetricsReporters() throws Exception {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
- final KStreamBuilder builder = new KStreamBuilder();
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
final int initDiff = newInitCount - oldInitCount;
assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
@@ -310,8 +309,8 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
- final KStreamBuilder builder = new KStreamBuilder();
- new KafkaStreams(builder, props);
+ final StreamsBuilder builder = new StreamsBuilder();
+ new KafkaStreams(builder.build(), props);
}
@Test
@@ -320,13 +319,13 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
- final KStreamBuilder builder1 = new KStreamBuilder();
- final KafkaStreams streams1 = new KafkaStreams(builder1, props);
+ final StreamsBuilder builder1 = new StreamsBuilder();
+ final KafkaStreams streams1 = new KafkaStreams(builder1.build(), props);
streams1.close();
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString());
- final KStreamBuilder builder2 = new KStreamBuilder();
- new KafkaStreams(builder2, props);
+ final StreamsBuilder builder2 = new StreamsBuilder();
+ new KafkaStreams(builder2.build(), props);
}
@Test(expected = IllegalStateException.class)
@@ -363,7 +362,7 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final CountDownLatch latch = new CountDownLatch(1);
final String topic = "input";
CLUSTER.createTopic(topic);
@@ -382,7 +381,7 @@ public class KafkaStreamsTest {
}
}
});
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
Collections.singletonList(new KeyValue<>("A", "A")),
@@ -407,8 +406,8 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- final KStreamBuilder builder = new KStreamBuilder();
- return new KafkaStreams(builder, props);
+ final StreamsBuilder builder = new StreamsBuilder();
+ return new KafkaStreams(builder.build(), props);
}
@Test
@@ -417,8 +416,8 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- final KStreamBuilder builder = new KStreamBuilder();
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.cleanUp();
streams.start();
@@ -432,8 +431,8 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- final KStreamBuilder builder = new KStreamBuilder();
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
TestUtils.waitForCondition(new TestCondition() {
@@ -477,11 +476,11 @@ public class KafkaStreamsTest {
final String topic = "topic";
CLUSTER.createTopic(topic);
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
builder.stream(Serdes.String(), Serdes.String(), topic);
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
streams.setStateListener(new KafkaStreams.StateListener() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
new file mode 100644
index 0000000..b0a0743
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.internals.KStreamImpl;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamsBuilderTest {
+
+ private final StreamsBuilder builder = new StreamsBuilder();
+
+ private KStreamTestDriver driver = null;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
+ @Test(expected = TopologyException.class)
+ public void testFrom() {
+ builder.stream("topic-1", "topic-2");
+
+ builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
+ }
+
+ @Test
+ public void shouldProcessingFromSinkTopic() {
+ final KStream<String, String> source = builder.stream("topic-source");
+ source.to("topic-sink");
+
+ final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+
+ source.process(processorSupplier);
+
+ driver = new KStreamTestDriver(builder);
+ driver.setTime(0L);
+
+ driver.process("topic-source", "A", "aa");
+
+ // no exception was thrown
+ assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
+ }
+
+ @Test
+ public void shouldProcessViaThroughTopic() {
+ final KStream<String, String> source = builder.stream("topic-source");
+ final KStream<String, String> through = source.through("topic-sink");
+
+ final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
+
+ source.process(sourceProcessorSupplier);
+ through.process(throughProcessorSupplier);
+
+ driver = new KStreamTestDriver(builder);
+ driver.setTime(0L);
+
+ driver.process("topic-source", "A", "aa");
+
+ assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
+ assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
+ }
+
+ @Test
+ public void testMerge() {
+ final String topic1 = "topic-1";
+ final String topic2 = "topic-2";
+
+ final KStream<String, String> source1 = builder.stream(topic1);
+ final KStream<String, String> source2 = builder.stream(topic2);
+ final KStream<String, String> merged = builder.merge(source1, source2);
+
+ final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ merged.process(processorSupplier);
+
+ driver = new KStreamTestDriver(builder);
+ driver.setTime(0L);
+
+ driver.process(topic1, "A", "aa");
+ driver.process(topic2, "B", "bb");
+ driver.process(topic2, "C", "cc");
+ driver.process(topic1, "D", "dd");
+
+ assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+ }
+
+ @Test(expected = TopologyException.class)
+ public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
+ builder.stream();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
+ builder.stream(Serdes.String(), Serdes.String(), null, null);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index c163935..12947d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -280,6 +280,7 @@ public class TopologyTest {
@Override
public void process(Object key, Object value) { }
+ @SuppressWarnings("deprecation")
@Override
public void punctuate(long timestamp) { }
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 0c3b36a..fb93783 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -23,12 +23,12 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -144,7 +144,7 @@ public class EosIntegrationTest {
final String inputTopic,
final String throughTopic,
final String outputTopic) throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, Long> input = builder.stream(inputTopic);
KStream<Long, Long> output = input;
if (throughTopic != null) {
@@ -155,7 +155,7 @@ public class EosIntegrationTest {
for (int i = 0; i < numberOfRestarts; ++i) {
final long factor = i;
final KafkaStreams streams = new KafkaStreams(
- builder,
+ builder.build(),
StreamsTestUtils.getStreamsConfig(
applicationId,
CLUSTER.bootstrapServers(),
@@ -235,11 +235,11 @@ public class EosIntegrationTest {
@Test
public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
final KafkaStreams streams = new KafkaStreams(
- builder,
+ builder.build(),
StreamsTestUtils.getStreamsConfig(
applicationId,
CLUSTER.bootstrapServers(),
@@ -578,7 +578,7 @@ public class EosIntegrationTest {
commitRequested = new AtomicInteger(0);
errorInjected = new AtomicBoolean(false);
injectGC = new AtomicBoolean(false);
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
String[] storeNames = null;
if (withState) {
@@ -657,7 +657,7 @@ public class EosIntegrationTest {
.to(SINGLE_PARTITION_OUTPUT_TOPIC);
final KafkaStreams streams = new KafkaStreams(
- builder,
+ builder.build(),
StreamsTestUtils.getStreamsConfig(
applicationId,
CLUSTER.bootstrapServers(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index f1f09a4..733ca0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -25,11 +25,11 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.IntegrationTest;
import org.junit.BeforeClass;
@@ -95,7 +95,7 @@ public class FanoutIntegrationTest {
//
// Step 1: Configure and start the processor topology.
//
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test");
@@ -122,7 +122,7 @@ public class FanoutIntegrationTest {
stream2.to(OUTPUT_TOPIC_B);
stream3.to(OUTPUT_TOPIC_C);
- final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
//
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 869c255..ad0f1c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -23,13 +23,13 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -72,7 +72,7 @@ public class GlobalKTableIntegrationTest {
return value1 + "+" + value2;
}
};
- private KStreamBuilder builder;
+ private StreamsBuilder builder;
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
private String globalOne;
@@ -88,7 +88,7 @@ public class GlobalKTableIntegrationTest {
@Before
public void before() throws InterruptedException {
testNo++;
- builder = new KStreamBuilder();
+ builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "globalOne-table-test-" + testNo;
@@ -227,7 +227,7 @@ public class GlobalKTableIntegrationTest {
}
private void startStreams() {
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 3658d10..587f478 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -30,11 +30,11 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -50,10 +50,10 @@ import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Map;
-import java.util.Properties;
import java.util.Arrays;
-import java.util.Locale;
import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -139,7 +139,7 @@ public class InternalTopicIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
@@ -157,7 +157,7 @@ public class InternalTopicIntegrationTest {
// Remove any state from previous test runs
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
- final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
//
@@ -185,7 +185,7 @@ public class InternalTopicIntegrationTest {
@Test
public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception {
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
@@ -203,7 +203,7 @@ public class InternalTopicIntegrationTest {
// Remove any state from previous test runs
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
- KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
//
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index b69e58b..490c928 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -25,12 +25,12 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
@@ -68,7 +68,7 @@ public class JoinIntegrationTest {
private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
private final static Properties STREAMS_CONFIG = new Properties();
- private KStreamBuilder builder;
+ private StreamsBuilder builder;
private KStream<Long, String> leftStream;
private KStream<Long, String> rightStream;
private KTable<Long, String> leftTable;
@@ -127,7 +127,7 @@ public class JoinIntegrationTest {
public void prepareTopology() throws Exception {
CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
- builder = new KStreamBuilder();
+ builder = new StreamsBuilder();
leftTable = builder.table(INPUT_TOPIC_1, "leftTable");
rightTable = builder.table(INPUT_TOPIC_2, "rightTable");
leftStream = leftTable.toStream();
@@ -154,7 +154,7 @@ public class JoinIntegrationTest {
assert expectedResult.size() == input.size();
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
- final KafkaStreams streams = new KafkaStreams(builder, STREAMS_CONFIG);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
try {
streams.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 372b89c..14a3ea9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
+import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -25,12 +26,12 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
@@ -42,6 +43,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@@ -51,9 +53,6 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import kafka.utils.MockTime;
-import org.junit.experimental.categories.Category;
-
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -72,7 +71,7 @@ public class KStreamAggregationDedupIntegrationTest {
private final MockTime mockTime = CLUSTER.time;
private static volatile int testNo = 0;
- private KStreamBuilder builder;
+ private StreamsBuilder builder;
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
private String streamOneInput;
@@ -85,7 +84,7 @@ public class KStreamAggregationDedupIntegrationTest {
@Before
public void before() throws InterruptedException {
testNo++;
- builder = new KStreamBuilder();
+ builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
String applicationId = "kgrouped-stream-test-" +
@@ -287,7 +286,7 @@ public class KStreamAggregationDedupIntegrationTest {
}
private void startStreams() {
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 0d5472c..8adae06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -35,7 +36,6 @@ import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
@@ -81,7 +81,7 @@ public class KStreamAggregationIntegrationTest {
private static volatile int testNo = 0;
private final MockTime mockTime = CLUSTER.time;
- private KStreamBuilder builder;
+ private StreamsBuilder builder;
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
private String streamOneInput;
@@ -96,7 +96,7 @@ public class KStreamAggregationIntegrationTest {
@Before
public void before() throws InterruptedException {
testNo++;
- builder = new KStreamBuilder();
+ builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "kgrouped-stream-test-" + testNo;
@@ -666,7 +666,7 @@ public class KStreamAggregationIntegrationTest {
}
private void startStreams() {
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 5a17566..176a37c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -28,11 +28,11 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
@@ -190,7 +190,7 @@ public class KStreamKTableJoinIntegrationTest {
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
// This KStream contains information such as "alice" -> 13L.
//
@@ -253,7 +253,7 @@ public class KStreamKTableJoinIntegrationTest {
// Write the (continuously updating) results to the output topic.
clicksPerRegion.to(stringSerde, longSerde, outputTopic);
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
//
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index d3ab176..b653647 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -27,12 +27,12 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -44,7 +44,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
-
import org.junit.experimental.categories.Category;
import java.io.IOException;
@@ -70,7 +69,7 @@ public class KStreamRepartitionJoinTest {
private final MockTime mockTime = CLUSTER.time;
private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
- private KStreamBuilder builder;
+ private StreamsBuilder builder;
private Properties streamsConfiguration;
private KStream<Long, Integer> streamOne;
private KStream<Integer, String> streamTwo;
@@ -89,7 +88,7 @@ public class KStreamRepartitionJoinTest {
public void before() throws InterruptedException {
testNo++;
String applicationId = "kstream-repartition-join-test-" + testNo;
- builder = new KStreamBuilder();
+ builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
@@ -356,7 +355,7 @@ public class KStreamRepartitionJoinTest {
private void startStreams() {
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index 3fecfc1..d730c6e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -28,8 +28,9 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -183,10 +184,10 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
final String outputTopic,
final List<String> expectedReceivedValues) throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix));
- final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix));
+ final KStream<String, String> pattern1Stream = builder.stream(Topology.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix));
+ final KStream<String, String> pattern2Stream = builder.stream(Topology.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix));
final KStream<String, String> namedTopicsStream = builder.stream(topicY, topicZ);
pattern1Stream.to(stringSerde, stringSerde, outputTopic);
@@ -204,7 +205,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
- final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedReceivedValues.size());
@@ -255,14 +256,14 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
@Test
public void shouldThrowExceptionOverlappingTopic() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
//NOTE this would realistically get caught when building topology, the test is for completeness
- builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
+ builder.stream(Topology.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
try {
- builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
- fail("Should have thrown TopologyBuilderException");
- } catch (final TopologyBuilderException expected) { }
+ builder.stream(Topology.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
+ fail("Should have thrown TopologyException");
+ } catch (final org.apache.kafka.streams.errors.TopologyException expected) { }
}
@Test
@@ -277,12 +278,12 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
STRING_SERDE_CLASSNAME,
props);
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> exceptionStream = builder.stream(NOOP);
exceptionStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
- KafkaStreams streams = new KafkaStreams(builder, localConfig);
+ KafkaStreams streams = new KafkaStreams(builder.build(), localConfig);
final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 4426a5e..9df1ef5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -24,10 +24,10 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -337,7 +337,7 @@ public class KTableKTableJoinIntegrationTest {
}
private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2, final String queryableName) {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
@@ -346,7 +346,7 @@ public class KTableKTableJoinIntegrationTest {
join(join(table1, table2, joinType1, null /* no need to query intermediate result */), table3,
joinType2, queryableName).to(OUTPUT);
- return new KafkaStreams(builder, new StreamsConfig(streamsConfig));
+ return new KafkaStreams(builder.build(), new StreamsConfig(streamsConfig));
}
private KTable<String, String> join(final KTable<String, String> first,
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 3e6abfd..2a06ef4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -30,13 +30,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsTest;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
@@ -189,7 +189,7 @@ public class QueryableStateIntegrationTest {
* Creates a typical word count topology
*/
private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final Serde<String> stringSerde = Serdes.String();
final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);
@@ -208,7 +208,7 @@ public class QueryableStateIntegrationTest {
// Create a Windowed State Store that contains the word count for every 1 minute
groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic);
- return new KafkaStreams(builder, streamsConfiguration);
+ return new KafkaStreams(builder.build(), streamsConfiguration);
}
private class StreamRunnable implements Runnable {
@@ -416,7 +416,7 @@ public class QueryableStateIntegrationTest {
public void shouldBeAbleToQueryFilterState() throws Exception {
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, Long>> batch1 = new HashSet<>();
batch1.addAll(Arrays.asList(
@@ -449,7 +449,7 @@ public class QueryableStateIntegrationTest {
t1.filterNot(filterPredicate, "queryFilterNot");
t2.to(outputTopic);
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
waitUntilAtLeastNumRecordProcessed(outputTopic, 2);
@@ -482,7 +482,7 @@ public class QueryableStateIntegrationTest {
public void shouldBeAbleToQueryMapValuesState() throws Exception {
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, String>> batch1 = new HashSet<>();
batch1.addAll(Arrays.asList(
@@ -511,7 +511,7 @@ public class QueryableStateIntegrationTest {
}, Serdes.Long(), "queryMapValues");
t2.to(Serdes.String(), Serdes.Long(), outputTopic);
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
@@ -528,7 +528,7 @@ public class QueryableStateIntegrationTest {
public void shouldBeAbleToQueryMapValuesAfterFilterState() throws Exception {
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, String>> batch1 = new HashSet<>();
batch1.addAll(Arrays.asList(
@@ -567,7 +567,7 @@ public class QueryableStateIntegrationTest {
}, Serdes.Long(), "queryMapValues");
t3.to(Serdes.String(), Serdes.Long(), outputTopic);
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
@@ -588,7 +588,7 @@ public class QueryableStateIntegrationTest {
private void verifyCanQueryState(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, String>> batch1 = new TreeSet<>(stringComparator);
@@ -621,7 +621,7 @@ public class QueryableStateIntegrationTest {
s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), outputTopic);
s1.groupByKey().count(TimeWindows.of(WINDOW_SIZE), "windowed-count");
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
@@ -642,12 +642,12 @@ public class QueryableStateIntegrationTest {
@Test
public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> stream = builder.stream(streamThree);
final String storeName = "count-by-key";
stream.groupByKey().count(storeName);
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
final KeyValue<String, String> hello = KeyValue.pair("hello", "hello");
@@ -677,7 +677,7 @@ public class QueryableStateIntegrationTest {
kafkaStreams.close();
// start again
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
// make sure we never get any value other than 8 for hello
@@ -718,7 +718,7 @@ public class QueryableStateIntegrationTest {
final AtomicBoolean failed = new AtomicBoolean(false);
final String storeName = "store";
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> input = builder.stream(streamOne);
input
.groupByKey()
@@ -736,7 +736,7 @@ public class QueryableStateIntegrationTest {
.to(outputTopic);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 522bd3f..f7757a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -27,17 +27,18 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
-import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -141,13 +142,13 @@ public class RegexSourceIntegrationTest {
CLUSTER.createTopic("TEST-TOPIC-1");
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
- final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
final Field streamThreadsField = streams.getClass().getDeclaredField("threads");
streamThreadsField.setAccessible(true);
@@ -155,7 +156,7 @@ public class RegexSourceIntegrationTest {
final StreamThread originalThread = streamThreads[0];
final TestStreamThread testStreamThread = new TestStreamThread(
- builder.internalTopologyBuilder,
+ InternalStreamsBuilderTest.internalTopologyBuilder(builder),
streamsConfig,
new DefaultKafkaClientSupplier(),
originalThread.applicationId,
@@ -201,13 +202,13 @@ public class RegexSourceIntegrationTest {
CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
- final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
final Field streamThreadsField = streams.getClass().getDeclaredField("threads");
streamThreadsField.setAccessible(true);
@@ -215,7 +216,7 @@ public class RegexSourceIntegrationTest {
final StreamThread originalThread = streamThreads[0];
final TestStreamThread testStreamThread = new TestStreamThread(
- builder.internalTopologyBuilder,
+ InternalStreamsBuilderTest.internalTopologyBuilder(builder),
streamsConfig,
new DefaultKafkaClientSupplier(),
originalThread.applicationId,
@@ -297,7 +298,7 @@ public class RegexSourceIntegrationTest {
final Serde<String> stringSerde = Serdes.String();
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
@@ -307,7 +308,7 @@ public class RegexSourceIntegrationTest {
pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
- final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
@@ -339,8 +340,8 @@ public class RegexSourceIntegrationTest {
public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception {
final Serde<String> stringSerde = Serdes.String();
- final KStreamBuilder builderLeader = new KStreamBuilder();
- final KStreamBuilder builderFollower = new KStreamBuilder();
+ final StreamsBuilder builderLeader = new StreamsBuilder();
+ final StreamsBuilder builderFollower = new StreamsBuilder();
final List<String> expectedAssignment = Arrays.asList(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2);
final KStream<String, String> partitionedStreamLeader = builderLeader.stream(Pattern.compile("partitioned-\\d"));
@@ -350,8 +351,8 @@ public class RegexSourceIntegrationTest {
partitionedStreamLeader.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
partitionedStreamFollower.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
- final KafkaStreams partitionedStreamsLeader = new KafkaStreams(builderLeader, streamsConfiguration);
- final KafkaStreams partitionedStreamsFollower = new KafkaStreams(builderFollower, streamsConfiguration);
+ final KafkaStreams partitionedStreamsLeader = new KafkaStreams(builderLeader.build(), streamsConfiguration);
+ final KafkaStreams partitionedStreamsFollower = new KafkaStreams(builderFollower.build(), streamsConfiguration);
final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
@@ -362,7 +363,7 @@ public class RegexSourceIntegrationTest {
final StreamThread originalLeaderThread = leaderStreamThreads[0];
final TestStreamThread leaderTestStreamThread = new TestStreamThread(
- builderLeader.internalTopologyBuilder,
+ InternalStreamsBuilderTest.internalTopologyBuilder(builderLeader),
streamsConfig,
new DefaultKafkaClientSupplier(),
originalLeaderThread.applicationId,
@@ -388,7 +389,7 @@ public class RegexSourceIntegrationTest {
final StreamThread originalFollowerThread = followerStreamThreads[0];
final TestStreamThread followerTestStreamThread = new TestStreamThread(
- builderFollower.internalTopologyBuilder,
+ InternalStreamsBuilderTest.internalTopologyBuilder(builderFollower),
streamsConfig,
new DefaultKafkaClientSupplier(),
originalFollowerThread.applicationId,
@@ -429,7 +430,7 @@ public class RegexSourceIntegrationTest {
final Serde<String> stringSerde = Serdes.String();
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
// overlapping patterns here, no messages should be sent as TopologyBuilderException
@@ -442,7 +443,7 @@ public class RegexSourceIntegrationTest {
pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
- final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 3cff7f7..1a7c3a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -31,11 +31,12 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
@@ -288,8 +289,8 @@ public class ResetIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
}
- private KStreamBuilder setupTopologyWithIntermediateUserTopic(final String outputTopic2) {
- final KStreamBuilder builder = new KStreamBuilder();
+ private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic2) {
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
@@ -316,11 +317,11 @@ public class ResetIntegrationTest {
})
.to(Serdes.Long(), Serdes.Long(), outputTopic2);
- return builder;
+ return builder.build();
}
- private KStreamBuilder setupTopologyWithoutIntermediateUserTopic() {
- final KStreamBuilder builder = new KStreamBuilder();
+ private Topology setupTopologyWithoutIntermediateUserTopic() {
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
@@ -332,7 +333,7 @@ public class ResetIntegrationTest {
}
}).to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
- return builder;
+ return builder.build();
}
private void cleanGlobal(final String intermediateUserTopic) {