You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/31 22:29:07 UTC

[4/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index e6c0d6e..ad0f236 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -149,7 +148,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(null, name, null, null, null, topics);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -173,7 +172,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topics);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -197,7 +196,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -224,7 +223,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topics);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -247,7 +246,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -272,7 +271,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topicPattern);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -298,7 +297,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -326,7 +325,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topicPattern);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -344,7 +343,7 @@ public class TopologyBuilder {
      *                           if not specified the default value deserializer defined in the configs will be used
      * @param topics             the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
-     * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
+     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
     public synchronized final TopologyBuilder addSource(final String name,
                                                         final Deserializer keyDeserializer,
@@ -353,7 +352,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -374,7 +373,7 @@ public class TopologyBuilder {
      *                           if not specified the default value deserializer defined in the configs will be used
      * @param topics             the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
-     * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
+     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
@@ -385,7 +384,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topics);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -422,7 +421,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -461,7 +460,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -482,7 +481,7 @@ public class TopologyBuilder {
      *                           if not specified the default value deserializer defined in the configs will be used
      * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
-     * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
+     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
      */
     public synchronized final TopologyBuilder addSource(final String name,
                                                         final Deserializer keyDeserializer,
@@ -491,7 +490,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -515,7 +514,7 @@ public class TopologyBuilder {
      *                           if not specified the default value deserializer defined in the configs will be used
      * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
-     * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
+     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
      */
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
@@ -526,7 +525,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -548,7 +547,7 @@ public class TopologyBuilder {
      *                           if not specified the default value deserializer defined in the configs will be used
      * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
-     * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
+     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
      */
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
@@ -558,7 +557,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, keyDeserializer, valDeserializer, topicPattern);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -584,7 +583,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -619,7 +618,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -651,7 +650,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -675,7 +674,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @throws TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
+     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
      */
     public synchronized final <K, V> TopologyBuilder addSink(final String name,
                                                              final String topic,
@@ -686,7 +685,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -699,7 +698,7 @@ public class TopologyBuilder {
      * @param predecessorNames the name of one or more source or processor nodes whose output records this processor should receive
      * and process
      * @return this builder instance so methods can be chained together; never null
-     * @throws TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
+     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
      */
     public synchronized final TopologyBuilder addProcessor(final String name,
                                                            final ProcessorSupplier supplier,
@@ -707,7 +706,7 @@ public class TopologyBuilder {
         try {
             internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -717,14 +716,14 @@ public class TopologyBuilder {
      *
      * @param supplier the supplier used to obtain this state store {@link StateStore} instance
      * @return this builder instance so methods can be chained together; never null
-     * @throws TopologyBuilderException if state store supplier is already added
+     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if state store supplier is already added
      */
     public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
                                                             final String... processorNames) {
         try {
             internalTopologyBuilder.addStateStore(supplier, processorNames);
         } catch (final TopologyException e) {
-            throw new TopologyBuilderException(e);
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }
         return this;
     }
@@ -742,7 +741,7 @@ public class TopologyBuilder {
             try {
                 internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
             } catch (final TopologyException e) {
-                throw new TopologyBuilderException(e);
+                throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
             }
         }
         return this;
@@ -769,7 +768,7 @@ public class TopologyBuilder {
      *
      * @param processorNames the name of the processors
      * @return this builder instance so methods can be chained together; never null
-     * @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
+     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
      */
     public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
         internalTopologyBuilder.connectProcessors(processorNames);

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index f9ae216..ebdd64d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
@@ -661,7 +660,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 continue;
             }
             if (numPartitions < 0) {
-                throw new TopologyBuilderException(String.format("%s Topic [%s] number of partitions not defined", logPrefix, topic.name()));
+                throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topic [%s] number of partitions not defined", logPrefix, topic.name()));
             }
 
             topicsToMakeReady.put(topic, numPartitions);
@@ -786,7 +785,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     final Integer partitions = metadata.partitionCountForTopic(topic);
 
                     if (partitions == null) {
-                        throw new TopologyBuilderException(String.format("%s Topic not found: %s", logPrefix, topic));
+                        throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topic not found: %s", logPrefix, topic));
                     }
 
                     if (numPartitions == UNKNOWN) {
@@ -794,7 +793,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     } else if (numPartitions != partitions) {
                         final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                         Arrays.sort(topics);
-                        throw new TopologyBuilderException(String.format("%s Topics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
+                        throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
                     }
                 } else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
                     numPartitions = NOT_AVAILABLE;

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index a6532df..c6accde 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -64,7 +63,7 @@ public class KafkaStreamsTest {
     // quick enough)
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
-    private final KStreamBuilder builder = new KStreamBuilder();
+    private final StreamsBuilder builder = new StreamsBuilder();
     private KafkaStreams streams;
     private Properties props;
 
@@ -76,13 +75,13 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
-        streams = new KafkaStreams(builder, props);
+        streams = new KafkaStreams(builder.build(), props);
     }
 
     @Test
     public void testStateChanges() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
         final StateListenerStub stateListener = new StateListenerStub();
         streams.setStateListener(stateListener);
@@ -102,8 +101,8 @@ public class KafkaStreamsTest {
 
     @Test
     public void testStateCloseAfterCreate() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
         final StateListenerStub stateListener = new StateListenerStub();
         streams.setStateListener(stateListener);
@@ -114,11 +113,11 @@ public class KafkaStreamsTest {
     @Test
     public void testStateThreadClose() throws Exception {
         final int numThreads = 2;
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
 
 
@@ -171,11 +170,11 @@ public class KafkaStreamsTest {
     @Test
     public void testStateGlobalThreadClose() throws Exception {
         final int numThreads = 2;
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
 
         streams.start();
@@ -207,8 +206,8 @@ public class KafkaStreamsTest {
     @Test
     public void testInitializesAndDestroysMetricsReporters() throws Exception {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int initDiff = newInitCount - oldInitCount;
         assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
@@ -310,8 +309,8 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
-        final KStreamBuilder builder = new KStreamBuilder();
-        new KafkaStreams(builder, props);
+        final StreamsBuilder builder = new StreamsBuilder();
+        new KafkaStreams(builder.build(), props);
     }
 
     @Test
@@ -320,13 +319,13 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
-        final KStreamBuilder builder1 = new KStreamBuilder();
-        final KafkaStreams streams1 = new KafkaStreams(builder1, props);
+        final StreamsBuilder builder1 = new StreamsBuilder();
+        final KafkaStreams streams1 = new KafkaStreams(builder1.build(), props);
         streams1.close();
 
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString());
-        final KStreamBuilder builder2 = new KStreamBuilder();
-        new KafkaStreams(builder2, props);
+        final StreamsBuilder builder2 = new StreamsBuilder();
+        new KafkaStreams(builder2.build(), props);
     }
 
     @Test(expected = IllegalStateException.class)
@@ -363,7 +362,7 @@ public class KafkaStreamsTest {
             props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
             props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-            final KStreamBuilder builder = new KStreamBuilder();
+            final StreamsBuilder builder = new StreamsBuilder();
             final CountDownLatch latch = new CountDownLatch(1);
             final String topic = "input";
             CLUSTER.createTopic(topic);
@@ -382,7 +381,7 @@ public class KafkaStreamsTest {
                             }
                         }
                     });
-            final KafkaStreams streams = new KafkaStreams(builder, props);
+            final KafkaStreams streams = new KafkaStreams(builder.build(), props);
             streams.start();
             IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
                                                                             Collections.singletonList(new KeyValue<>("A", "A")),
@@ -407,8 +406,8 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
-        final KStreamBuilder builder = new KStreamBuilder();
-        return new KafkaStreams(builder, props);
+        final StreamsBuilder builder = new StreamsBuilder();
+        return new KafkaStreams(builder.build(), props);
     }
 
     @Test
@@ -417,8 +416,8 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
         streams.cleanUp();
         streams.start();
@@ -432,8 +431,8 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
         streams.start();
         TestUtils.waitForCondition(new TestCondition() {
@@ -477,11 +476,11 @@ public class KafkaStreamsTest {
 
         final String topic = "topic";
         CLUSTER.createTopic(topic);
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         builder.stream(Serdes.String(), Serdes.String(), topic);
 
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);
         streams.setStateListener(new KafkaStreams.StateListener() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
new file mode 100644
index 0000000..b0a0743
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.internals.KStreamImpl;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamsBuilderTest {
+
+    private final StreamsBuilder builder = new StreamsBuilder();
+
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testFrom() {
+        builder.stream("topic-1", "topic-2");
+
+        builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
+    }
+
+    @Test
+    public void shouldProcessingFromSinkTopic() {
+        final KStream<String, String> source = builder.stream("topic-source");
+        source.to("topic-sink");
+
+        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+
+        source.process(processorSupplier);
+
+        driver = new KStreamTestDriver(builder);
+        driver.setTime(0L);
+
+        driver.process("topic-source", "A", "aa");
+
+        // no exception was thrown
+        assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
+    }
+
+    @Test
+    public void shouldProcessViaThroughTopic() {
+        final KStream<String, String> source = builder.stream("topic-source");
+        final KStream<String, String> through = source.through("topic-sink");
+
+        final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
+
+        source.process(sourceProcessorSupplier);
+        through.process(throughProcessorSupplier);
+
+        driver = new KStreamTestDriver(builder);
+        driver.setTime(0L);
+
+        driver.process("topic-source", "A", "aa");
+
+        assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
+        assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
+    }
+
+    @Test
+    public void testMerge() {
+        final String topic1 = "topic-1";
+        final String topic2 = "topic-2";
+
+        final KStream<String, String> source1 = builder.stream(topic1);
+        final KStream<String, String> source2 = builder.stream(topic2);
+        final KStream<String, String> merged = builder.merge(source1, source2);
+
+        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        merged.process(processorSupplier);
+
+        driver = new KStreamTestDriver(builder);
+        driver.setTime(0L);
+
+        driver.process(topic1, "A", "aa");
+        driver.process(topic2, "B", "bb");
+        driver.process(topic2, "C", "cc");
+        driver.process(topic1, "D", "dd");
+
+        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
+        builder.stream();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
+        builder.stream(Serdes.String(), Serdes.String(), null, null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index c163935..12947d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -280,6 +280,7 @@ public class TopologyTest {
                 @Override
                 public void process(Object key, Object value) { }
 
+                @SuppressWarnings("deprecation")
                 @Override
                 public void punctuate(long timestamp) { }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 0c3b36a..fb93783 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -23,12 +23,12 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -144,7 +144,7 @@ public class EosIntegrationTest {
                                    final String inputTopic,
                                    final String throughTopic,
                                    final String outputTopic) throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Long, Long> input = builder.stream(inputTopic);
         KStream<Long, Long> output = input;
         if (throughTopic != null) {
@@ -155,7 +155,7 @@ public class EosIntegrationTest {
         for (int i = 0; i < numberOfRestarts; ++i) {
             final long factor = i;
             final KafkaStreams streams = new KafkaStreams(
-                builder,
+                builder.build(),
                 StreamsTestUtils.getStreamsConfig(
                     applicationId,
                     CLUSTER.bootstrapServers(),
@@ -235,11 +235,11 @@ public class EosIntegrationTest {
 
     @Test
     public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
 
         final KafkaStreams streams = new KafkaStreams(
-            builder,
+            builder.build(),
             StreamsTestUtils.getStreamsConfig(
                 applicationId,
                 CLUSTER.bootstrapServers(),
@@ -578,7 +578,7 @@ public class EosIntegrationTest {
         commitRequested = new AtomicInteger(0);
         errorInjected = new AtomicBoolean(false);
         injectGC = new AtomicBoolean(false);
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String[] storeNames = null;
         if (withState) {
@@ -657,7 +657,7 @@ public class EosIntegrationTest {
             .to(SINGLE_PARTITION_OUTPUT_TOPIC);
 
         final KafkaStreams streams = new KafkaStreams(
-            builder,
+            builder.build(),
             StreamsTestUtils.getStreamsConfig(
                 applicationId,
                 CLUSTER.bootstrapServers(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index f1f09a4..733ca0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -25,11 +25,11 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.BeforeClass;
@@ -95,7 +95,7 @@ public class FanoutIntegrationTest {
         //
         // Step 1: Configure and start the processor topology.
         //
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test");
@@ -122,7 +122,7 @@ public class FanoutIntegrationTest {
         stream2.to(OUTPUT_TOPIC_B);
         stream3.to(OUTPUT_TOPIC_C);
 
-        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
         streams.start();
 
         //

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 869c255..ad0f1c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -23,13 +23,13 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -72,7 +72,7 @@ public class GlobalKTableIntegrationTest {
             return value1 + "+" + value2;
         }
     };
-    private KStreamBuilder builder;
+    private StreamsBuilder builder;
     private Properties streamsConfiguration;
     private KafkaStreams kafkaStreams;
     private String globalOne;
@@ -88,7 +88,7 @@ public class GlobalKTableIntegrationTest {
     @Before
     public void before() throws InterruptedException {
         testNo++;
-        builder = new KStreamBuilder();
+        builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
         final String applicationId = "globalOne-table-test-" + testNo;
@@ -227,7 +227,7 @@ public class GlobalKTableIntegrationTest {
     }
 
     private void startStreams() {
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 3658d10..587f478 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -30,11 +30,11 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -50,10 +50,10 @@ import scala.Tuple2;
 import scala.collection.Iterator;
 import scala.collection.Map;
 
-import java.util.Properties;
 import java.util.Arrays;
-import java.util.Locale;
 import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -139,7 +139,7 @@ public class InternalTopicIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
 
@@ -157,7 +157,7 @@ public class InternalTopicIntegrationTest {
         // Remove any state from previous test runs
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
 
-        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
         streams.start();
 
         //
@@ -185,7 +185,7 @@ public class InternalTopicIntegrationTest {
 
     @Test
     public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
 
@@ -203,7 +203,7 @@ public class InternalTopicIntegrationTest {
         // Remove any state from previous test runs
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
 
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
         streams.start();
 
         //

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index b69e58b..490c928 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -25,12 +25,12 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.IntegrationTest;
@@ -68,7 +68,7 @@ public class JoinIntegrationTest {
     private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
     private final static Properties STREAMS_CONFIG = new Properties();
 
-    private KStreamBuilder builder;
+    private StreamsBuilder builder;
     private KStream<Long, String> leftStream;
     private KStream<Long, String> rightStream;
     private KTable<Long, String> leftTable;
@@ -127,7 +127,7 @@ public class JoinIntegrationTest {
     public void prepareTopology() throws Exception {
         CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
 
-        builder = new KStreamBuilder();
+        builder = new StreamsBuilder();
         leftTable = builder.table(INPUT_TOPIC_1, "leftTable");
         rightTable = builder.table(INPUT_TOPIC_2, "rightTable");
         leftStream = leftTable.toStream();
@@ -154,7 +154,7 @@ public class JoinIntegrationTest {
         assert expectedResult.size() == input.size();
 
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
-        final KafkaStreams streams = new KafkaStreams(builder, STREAMS_CONFIG);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
         try {
             streams.start();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 372b89c..14a3ea9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -25,12 +26,12 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
@@ -42,6 +43,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -51,9 +53,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
-import kafka.utils.MockTime;
-import org.junit.experimental.categories.Category;
-
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
@@ -72,7 +71,7 @@ public class KStreamAggregationDedupIntegrationTest {
 
     private final MockTime mockTime = CLUSTER.time;
     private static volatile int testNo = 0;
-    private KStreamBuilder builder;
+    private StreamsBuilder builder;
     private Properties streamsConfiguration;
     private KafkaStreams kafkaStreams;
     private String streamOneInput;
@@ -85,7 +84,7 @@ public class KStreamAggregationDedupIntegrationTest {
     @Before
     public void before() throws InterruptedException {
         testNo++;
-        builder = new KStreamBuilder();
+        builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
         String applicationId = "kgrouped-stream-test-" +
@@ -287,7 +286,7 @@ public class KStreamAggregationDedupIntegrationTest {
     }
 
     private void startStreams() {
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 0d5472c..8adae06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -35,7 +36,6 @@ import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindows;
@@ -81,7 +81,7 @@ public class KStreamAggregationIntegrationTest {
 
     private static volatile int testNo = 0;
     private final MockTime mockTime = CLUSTER.time;
-    private KStreamBuilder builder;
+    private StreamsBuilder builder;
     private Properties streamsConfiguration;
     private KafkaStreams kafkaStreams;
     private String streamOneInput;
@@ -96,7 +96,7 @@ public class KStreamAggregationIntegrationTest {
     @Before
     public void before() throws InterruptedException {
         testNo++;
-        builder = new KStreamBuilder();
+        builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
         final String applicationId = "kgrouped-stream-test-" + testNo;
@@ -666,7 +666,7 @@ public class KStreamAggregationIntegrationTest {
     }
 
     private void startStreams() {
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 5a17566..176a37c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -28,11 +28,11 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
@@ -190,7 +190,7 @@ public class KStreamKTableJoinIntegrationTest {
         final Serde<String> stringSerde = Serdes.String();
         final Serde<Long> longSerde = Serdes.Long();
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         // This KStream contains information such as "alice" -> 13L.
         //
@@ -253,7 +253,7 @@ public class KStreamKTableJoinIntegrationTest {
         // Write the (continuously updating) results to the output topic.
         clicksPerRegion.to(stringSerde, longSerde, outputTopic);
 
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
         //

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index d3ab176..b653647 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -27,12 +27,12 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -44,7 +44,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
-
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
@@ -70,7 +69,7 @@ public class KStreamRepartitionJoinTest {
     private final MockTime mockTime = CLUSTER.time;
     private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
 
-    private KStreamBuilder builder;
+    private StreamsBuilder builder;
     private Properties streamsConfiguration;
     private KStream<Long, Integer> streamOne;
     private KStream<Integer, String> streamTwo;
@@ -89,7 +88,7 @@ public class KStreamRepartitionJoinTest {
     public void before() throws InterruptedException {
         testNo++;
         String applicationId = "kstream-repartition-join-test-" + testNo;
-        builder = new KStreamBuilder();
+        builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
@@ -356,7 +355,7 @@ public class KStreamRepartitionJoinTest {
 
 
     private void startStreams() {
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index 3fecfc1..d730c6e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -28,8 +28,9 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -183,10 +184,10 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
         final String outputTopic,
         final List<String> expectedReceivedValues) throws Exception {
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix));
-        final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix));
+        final KStream<String, String> pattern1Stream = builder.stream(Topology.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix));
+        final KStream<String, String> pattern2Stream = builder.stream(Topology.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix));
         final KStream<String, String> namedTopicsStream = builder.stream(topicY, topicZ);
 
         pattern1Stream.to(stringSerde, stringSerde, outputTopic);
@@ -204,7 +205,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
 
         final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
 
-        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
         streams.start();
 
         final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedReceivedValues.size());
@@ -255,14 +256,14 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
 
     @Test
     public void shouldThrowExceptionOverlappingTopic() throws  Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         //NOTE this would realistically get caught when building topology, the test is for completeness
-        builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
+        builder.stream(Topology.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
 
         try {
-            builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
-            fail("Should have thrown TopologyBuilderException");
-        } catch (final TopologyBuilderException expected) { }
+            builder.stream(Topology.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
+            fail("Should have thrown TopologyException");
+        } catch (final org.apache.kafka.streams.errors.TopologyException expected) { }
     }
 
     @Test
@@ -277,12 +278,12 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
                 STRING_SERDE_CLASSNAME,
                 props);
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> exceptionStream = builder.stream(NOOP);
 
         exceptionStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        KafkaStreams streams = new KafkaStreams(builder, localConfig);
+        KafkaStreams streams = new KafkaStreams(builder.build(), localConfig);
 
         final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 4426a5e..9df1ef5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -24,10 +24,10 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -337,7 +337,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2, final String queryableName) {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
         final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
@@ -346,7 +346,7 @@ public class KTableKTableJoinIntegrationTest {
         join(join(table1, table2, joinType1, null /* no need to query intermediate result */), table3,
             joinType2, queryableName).to(OUTPUT);
 
-        return new KafkaStreams(builder, new StreamsConfig(streamsConfig));
+        return new KafkaStreams(builder.build(), new StreamsConfig(streamsConfig));
     }
 
     private KTable<String, String> join(final KTable<String, String> first,

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 3e6abfd..2a06ef4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -30,13 +30,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreamsTest;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Reducer;
@@ -189,7 +189,7 @@ public class QueryableStateIntegrationTest {
      * Creates a typical word count topology
      */
     private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final Serde<String> stringSerde = Serdes.String();
         final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);
 
@@ -208,7 +208,7 @@ public class QueryableStateIntegrationTest {
         // Create a Windowed State Store that contains the word count for every 1 minute
         groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic);
 
-        return new KafkaStreams(builder, streamsConfiguration);
+        return new KafkaStreams(builder.build(), streamsConfiguration);
     }
 
     private class StreamRunnable implements Runnable {
@@ -416,7 +416,7 @@ public class QueryableStateIntegrationTest {
     public void shouldBeAbleToQueryFilterState() throws Exception {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
         final Set<KeyValue<String, Long>> batch1 = new HashSet<>();
         batch1.addAll(Arrays.asList(
@@ -449,7 +449,7 @@ public class QueryableStateIntegrationTest {
         t1.filterNot(filterPredicate, "queryFilterNot");
         t2.to(outputTopic);
 
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 2);
@@ -482,7 +482,7 @@ public class QueryableStateIntegrationTest {
     public void shouldBeAbleToQueryMapValuesState() throws Exception {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
         final Set<KeyValue<String, String>> batch1 = new HashSet<>();
         batch1.addAll(Arrays.asList(
@@ -511,7 +511,7 @@ public class QueryableStateIntegrationTest {
         }, Serdes.Long(), "queryMapValues");
         t2.to(Serdes.String(), Serdes.Long(), outputTopic);
 
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
@@ -528,7 +528,7 @@ public class QueryableStateIntegrationTest {
     public void shouldBeAbleToQueryMapValuesAfterFilterState() throws Exception {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
         final Set<KeyValue<String, String>> batch1 = new HashSet<>();
         batch1.addAll(Arrays.asList(
@@ -567,7 +567,7 @@ public class QueryableStateIntegrationTest {
         }, Serdes.Long(), "queryMapValues");
         t3.to(Serdes.String(), Serdes.Long(), outputTopic);
 
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
@@ -588,7 +588,7 @@ public class QueryableStateIntegrationTest {
 
     private void verifyCanQueryState(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
 
         final Set<KeyValue<String, String>> batch1 = new TreeSet<>(stringComparator);
@@ -621,7 +621,7 @@ public class QueryableStateIntegrationTest {
         s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), outputTopic);
 
         s1.groupByKey().count(TimeWindows.of(WINDOW_SIZE), "windowed-count");
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
@@ -642,12 +642,12 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> stream = builder.stream(streamThree);
 
         final String storeName = "count-by-key";
         stream.groupByKey().count(storeName);
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
         final KeyValue<String, String> hello = KeyValue.pair("hello", "hello");
@@ -677,7 +677,7 @@ public class QueryableStateIntegrationTest {
         kafkaStreams.close();
 
         // start again
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
         // make sure we never get any value other than 8 for hello
@@ -718,7 +718,7 @@ public class QueryableStateIntegrationTest {
         final AtomicBoolean failed = new AtomicBoolean(false);
         final String storeName = "store";
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> input = builder.stream(streamOne);
         input
             .groupByKey()
@@ -736,7 +736,7 @@ public class QueryableStateIntegrationTest {
             .to(outputTopic);
 
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 522bd3f..f7757a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -27,17 +27,18 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
-import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -141,13 +142,13 @@ public class RegexSourceIntegrationTest {
 
         CLUSTER.createTopic("TEST-TOPIC-1");
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
 
         final Field streamThreadsField = streams.getClass().getDeclaredField("threads");
         streamThreadsField.setAccessible(true);
@@ -155,7 +156,7 @@ public class RegexSourceIntegrationTest {
         final StreamThread originalThread = streamThreads[0];
 
         final TestStreamThread testStreamThread = new TestStreamThread(
-            builder.internalTopologyBuilder,
+            InternalStreamsBuilderTest.internalTopologyBuilder(builder),
             streamsConfig,
             new DefaultKafkaClientSupplier(),
             originalThread.applicationId,
@@ -201,13 +202,13 @@ public class RegexSourceIntegrationTest {
 
         CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
 
         final Field streamThreadsField = streams.getClass().getDeclaredField("threads");
         streamThreadsField.setAccessible(true);
@@ -215,7 +216,7 @@ public class RegexSourceIntegrationTest {
         final StreamThread originalThread = streamThreads[0];
 
         final TestStreamThread testStreamThread = new TestStreamThread(
-            builder.internalTopologyBuilder,
+            InternalStreamsBuilderTest.internalTopologyBuilder(builder),
             streamsConfig,
             new DefaultKafkaClientSupplier(),
             originalThread.applicationId,
@@ -297,7 +298,7 @@ public class RegexSourceIntegrationTest {
 
         final Serde<String> stringSerde = Serdes.String();
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
         final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
@@ -307,7 +308,7 @@ public class RegexSourceIntegrationTest {
         pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
         streams.start();
 
         final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
@@ -339,8 +340,8 @@ public class RegexSourceIntegrationTest {
     public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception {
 
         final Serde<String> stringSerde = Serdes.String();
-        final KStreamBuilder builderLeader = new KStreamBuilder();
-        final KStreamBuilder builderFollower = new KStreamBuilder();
+        final StreamsBuilder builderLeader = new StreamsBuilder();
+        final StreamsBuilder builderFollower = new StreamsBuilder();
         final List<String> expectedAssignment = Arrays.asList(PARTITIONED_TOPIC_1,  PARTITIONED_TOPIC_2);
 
         final KStream<String, String> partitionedStreamLeader = builderLeader.stream(Pattern.compile("partitioned-\\d"));
@@ -350,8 +351,8 @@ public class RegexSourceIntegrationTest {
         partitionedStreamLeader.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         partitionedStreamFollower.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        final KafkaStreams partitionedStreamsLeader  = new KafkaStreams(builderLeader, streamsConfiguration);
-        final KafkaStreams partitionedStreamsFollower  = new KafkaStreams(builderFollower, streamsConfiguration);
+        final KafkaStreams partitionedStreamsLeader  = new KafkaStreams(builderLeader.build(), streamsConfiguration);
+        final KafkaStreams partitionedStreamsFollower  = new KafkaStreams(builderFollower.build(), streamsConfiguration);
 
         final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
 
@@ -362,7 +363,7 @@ public class RegexSourceIntegrationTest {
         final StreamThread originalLeaderThread = leaderStreamThreads[0];
 
         final TestStreamThread leaderTestStreamThread = new TestStreamThread(
-            builderLeader.internalTopologyBuilder,
+            InternalStreamsBuilderTest.internalTopologyBuilder(builderLeader),
             streamsConfig,
             new DefaultKafkaClientSupplier(),
             originalLeaderThread.applicationId,
@@ -388,7 +389,7 @@ public class RegexSourceIntegrationTest {
         final StreamThread originalFollowerThread = followerStreamThreads[0];
 
         final TestStreamThread followerTestStreamThread = new TestStreamThread(
-            builderFollower.internalTopologyBuilder,
+            InternalStreamsBuilderTest.internalTopologyBuilder(builderFollower),
             streamsConfig,
             new DefaultKafkaClientSupplier(),
             originalFollowerThread.applicationId,
@@ -429,7 +430,7 @@ public class RegexSourceIntegrationTest {
 
         final Serde<String> stringSerde = Serdes.String();
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
 
         // overlapping patterns here, no messages should be sent as TopologyBuilderException
@@ -442,7 +443,7 @@ public class RegexSourceIntegrationTest {
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
         streams.start();
 
         final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 3cff7f7..1a7c3a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -31,11 +31,12 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -288,8 +289,8 @@ public class ResetIntegrationTest {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
     }
 
-    private KStreamBuilder setupTopologyWithIntermediateUserTopic(final String outputTopic2) {
-        final KStreamBuilder builder = new KStreamBuilder();
+    private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic2) {
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
 
@@ -316,11 +317,11 @@ public class ResetIntegrationTest {
             })
             .to(Serdes.Long(), Serdes.Long(), outputTopic2);
 
-        return builder;
+        return builder.build();
     }
 
-    private KStreamBuilder setupTopologyWithoutIntermediateUserTopic() {
-        final KStreamBuilder builder = new KStreamBuilder();
+    private Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
 
@@ -332,7 +333,7 @@ public class ResetIntegrationTest {
             }
         }).to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
 
-        return builder;
+        return builder.build();
     }
 
     private void cleanGlobal(final String intermediateUserTopic) {