You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/31 22:29:05 UTC

[2/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index d47b1e6..b099241 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -18,22 +18,22 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import java.util.List;
-import java.util.Locale;
-import java.util.ArrayList;
-import java.util.Arrays;
+
 import java.io.File;
 import java.io.IOException;
-
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
 
 import static org.junit.Assert.assertEquals;
 
@@ -86,7 +86,7 @@ public class KTableForeachTest {
             };
 
         // When
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
         KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName, "anyStoreName");
         table.foreach(action);
 
@@ -112,7 +112,7 @@ public class KTableForeachTest {
             public void apply(Number key, Object value) {}
         };
 
-        new KStreamBuilder()
+        new StreamsBuilder()
             .<Integer, String>table("emptyTopic", "emptyStore")
             .foreach(consume);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 1fbaebf..63ea763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -19,16 +19,14 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
@@ -41,9 +39,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.lang.reflect.Field;
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -56,7 +54,7 @@ public class KTableImplTest {
 
     private KStreamTestDriver driver = null;
     private File stateDir = null;
-    private KStreamBuilder builder;
+    private StreamsBuilder builder;
     private KTable<String, String> table;
 
     @After
@@ -70,13 +68,13 @@ public class KTableImplTest {
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
-        builder = new KStreamBuilder();
+        builder = new StreamsBuilder();
         table = builder.table("test", "test");
     }
 
     @Test
     public void testKTable() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
         String topic2 = "topic2";
@@ -133,7 +131,7 @@ public class KTableImplTest {
 
     @Test
     public void testValueGetter() throws IOException {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
         String topic2 = "topic2";
@@ -266,12 +264,11 @@ public class KTableImplTest {
         String storeName1 = "storeName1";
         String storeName2 = "storeName2";
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
-        KTableImpl<String, String, String> table2 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2, storeName2);
+        builder.table(stringSerde, stringSerde, topic2, storeName2);
 
         KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
@@ -280,7 +277,7 @@ public class KTableImplTest {
                         return new Integer(value);
                     }
                 });
-        KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+        table1Mapped.filter(
                 new Predicate<String, Integer>() {
                     @Override
                     public boolean test(String key, Integer value) {
@@ -302,7 +299,7 @@ public class KTableImplTest {
         String storeName1 = "storeName1";
         String storeName2 = "storeName2";
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
@@ -343,19 +340,17 @@ public class KTableImplTest {
         String topic1 = "topic1";
         String storeName1 = "storeName1";
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
 
-        KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1
-                .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
-                .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");
+        table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");
 
 
-        KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1
-                .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
-                .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
+        table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
 
         driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
         driver.setTime(0L);
@@ -405,16 +400,19 @@ public class KTableImplTest {
         table.mapValues(null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullFilePathOnWriteAsText() throws Exception {
         table.writeAsText(null);
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @SuppressWarnings("deprecation")
+    @Test(expected = TopologyException.class)
     public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
         table.writeAsText("\t  \t");
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullActionOnForEach() throws Exception {
         table.foreach(null);
@@ -442,22 +440,22 @@ public class KTableImplTest {
 
     @Test
     public void shouldAllowNullStoreInJoin() throws Exception {
-        table.join(table, MockValueJoiner.TOSTRING_JOINER, null, (String) null);
+        table.join(table, MockValueJoiner.TOSTRING_JOINER, null, null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreSupplierInJoin() throws Exception {
-        table.join(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>) null);
+        table.join(table, MockValueJoiner.TOSTRING_JOINER, null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreSupplierInLeftJoin() throws Exception {
-        table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>) null);
+        table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreSupplierInOuterJoin() throws Exception {
-        table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>) null);
+        table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
     }
 
     @Test(expected = NullPointerException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index fdc07f3..668e7f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -31,6 +31,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -67,11 +68,24 @@ public class KTableKTableJoinTest {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-    private void doTestJoin(final KStreamBuilder builder,
+    public static Collection<Set<String>> getCopartitionedGroups(StreamsBuilder builder) {
+        // TODO: we should refactor this to avoid usage of reflection
+        try {
+            final Field internalStreamsBuilderField = builder.getClass().getDeclaredField("internalStreamsBuilder");
+            internalStreamsBuilderField.setAccessible(true);
+            final InternalStreamsBuilder internalStreamsBuilder = (InternalStreamsBuilder) internalStreamsBuilderField.get(builder);
+
+            return internalStreamsBuilder.internalTopologyBuilder.copartitionGroups();
+        } catch (final NoSuchFieldException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void doTestJoin(final StreamsBuilder builder,
                             final int[] expectedKeys,
                             final MockProcessorSupplier<Integer, String> processor,
                             final KTable<Integer, String> joined) {
-        final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        final Collection<Set<String>> copartitionGroups = getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -163,7 +177,7 @@ public class KTableKTableJoinTest {
 
     @Test
     public void testJoin() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -184,7 +198,7 @@ public class KTableKTableJoinTest {
 
     @Test
     public void testQueryableJoin() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -202,7 +216,7 @@ public class KTableKTableJoinTest {
         doTestJoin(builder, expectedKeys, processor, joined);
     }
 
-    private void doTestSendingOldValues(final KStreamBuilder builder,
+    private void doTestSendingOldValues(final StreamsBuilder builder,
                                         final int[] expectedKeys,
                                         final KTable<Integer, String> table1,
                                         final KTable<Integer, String> table2,
@@ -285,7 +299,7 @@ public class KTableKTableJoinTest {
 
     @Test
     public void testNotSendingOldValues() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -298,7 +312,7 @@ public class KTableKTableJoinTest {
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
         proc = new MockProcessorSupplier<>();
-        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
         doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
 
@@ -306,7 +320,7 @@ public class KTableKTableJoinTest {
 
     @Test
     public void testQueryableNotSendingOldValues() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -319,7 +333,7 @@ public class KTableKTableJoinTest {
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
         proc = new MockProcessorSupplier<>();
-        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
         doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
 
@@ -327,7 +341,7 @@ public class KTableKTableJoinTest {
 
     @Test
     public void testSendingOldValues() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -341,7 +355,7 @@ public class KTableKTableJoinTest {
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
-        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
         doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, true);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index dce19e3..7913902 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -74,7 +74,7 @@ public class KTableKTableLeftJoinTest {
 
     @Test
     public void testJoin() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -85,7 +85,7 @@ public class KTableKTableLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         joined.toStream().process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -168,7 +168,7 @@ public class KTableKTableLeftJoinTest {
 
     @Test
     public void testNotSendingOldValue() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -182,7 +182,7 @@ public class KTableKTableLeftJoinTest {
         joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
-        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
         driver = new KStreamTestDriver(builder, stateDir);
         driver.setTime(0L);
@@ -249,7 +249,7 @@ public class KTableKTableLeftJoinTest {
 
     @Test
     public void testSendingOldValue() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -265,7 +265,7 @@ public class KTableKTableLeftJoinTest {
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
         proc = new MockProcessorSupplier<>();
-        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
         driver = new KStreamTestDriver(builder, stateDir);
         driver.setTime(0L);
@@ -346,7 +346,7 @@ public class KTableKTableLeftJoinTest {
         final String tableSix = "tableSix";
         final String[] inputs = {agg, tableOne, tableTwo, tableThree, tableFour, tableFive, tableSix};
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Long, String> aggTable = builder.table(Serdes.Long(), Serdes.String(), agg, agg)
                 .groupBy(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index a60eae3..1ea86dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -69,7 +69,7 @@ public class KTableKTableOuterJoinTest {
 
     @Test
     public void testJoin() throws Exception {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -84,7 +84,7 @@ public class KTableKTableOuterJoinTest {
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -174,7 +174,7 @@ public class KTableKTableOuterJoinTest {
 
     @Test
     public void testNotSendingOldValue() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -188,7 +188,7 @@ public class KTableKTableOuterJoinTest {
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
-        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
         driver = new KStreamTestDriver(builder, stateDir);
 
@@ -262,7 +262,7 @@ public class KTableKTableOuterJoinTest {
 
     @Test
     public void testSendingOldValue() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
@@ -278,7 +278,7 @@ public class KTableKTableOuterJoinTest {
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
         proc = new MockProcessorSupplier<>();
-        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
         driver = new KStreamTestDriver(builder, stateDir);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 5ff01e6..9c5955a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -19,14 +19,13 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,7 +60,7 @@ public class KTableMapKeysTest {
 
     @Test
     public void testMapKeysConvertingToStream() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic_map_keys";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 70b1a55..6bdf83d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -58,7 +58,7 @@ public class KTableMapValuesTest {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-    private void doTestKTable(final KStreamBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> proc2) {
+    private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> proc2) {
         driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.String());
 
         driver.process(topic1, "A", "1");
@@ -71,7 +71,7 @@ public class KTableMapValuesTest {
 
     @Test
     public void testKTable() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -91,7 +91,7 @@ public class KTableMapValuesTest {
 
     @Test
     public void testQueryableKTable() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -109,7 +109,7 @@ public class KTableMapValuesTest {
         doTestKTable(builder, topic1, proc2);
     }
 
-    private void doTestValueGetter(final KStreamBuilder builder,
+    private void doTestValueGetter(final StreamsBuilder builder,
                                    final String topic1,
                                    final KTableImpl<String, String, String> table1,
                                    final KTableImpl<String, String, Integer> table2,
@@ -212,7 +212,7 @@ public class KTableMapValuesTest {
 
     @Test
     public void testValueGetter() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
         String topic2 = "topic2";
@@ -243,7 +243,7 @@ public class KTableMapValuesTest {
 
     @Test
     public void testQueryableValueGetter() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
         String topic2 = "topic2";
@@ -274,7 +274,7 @@ public class KTableMapValuesTest {
 
     @Test
     public void testNotSendingOldValue() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -290,7 +290,7 @@ public class KTableMapValuesTest {
 
         MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
 
-        builder.addProcessor("proc", proc, table2.name);
+        builder.build().addProcessor("proc", proc, table2.name);
 
         driver = new KStreamTestDriver(builder, stateDir, null, null);
         assertFalse(table1.sendingOldValueEnabled());
@@ -322,7 +322,7 @@ public class KTableMapValuesTest {
 
     @Test
     public void testSendingOldValue() throws IOException {
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -340,7 +340,7 @@ public class KTableMapValuesTest {
 
         MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
 
-        builder.addProcessor("proc", proc, table2.name);
+        builder.build().addProcessor("proc", proc, table2.name);
 
         driver = new KStreamTestDriver(builder, stateDir, null, null);
         assertTrue(table1.sendingOldValueEnabled());

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 20a6e9a..77d2b19 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -58,7 +58,7 @@ public class KTableSourceTest {
 
     @Test
     public void testKTable() {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -82,7 +82,7 @@ public class KTableSourceTest {
 
     @Test
     public void testValueGetter() throws IOException {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -126,7 +126,7 @@ public class KTableSourceTest {
 
     @Test
     public void testNotSendingOldValue() throws IOException {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -134,7 +134,7 @@ public class KTableSourceTest {
 
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
 
-        builder.addProcessor("proc1", proc1, table1.name);
+        builder.build().addProcessor("proc1", proc1, table1.name);
 
         driver = new KStreamTestDriver(builder, stateDir, null, null);
         driver.process(topic1, "A", "01");
@@ -164,7 +164,7 @@ public class KTableSourceTest {
 
     @Test
     public void testSendingOldValue() throws IOException {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         String topic1 = "topic1";
 
@@ -176,7 +176,7 @@ public class KTableSourceTest {
 
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
 
-        builder.addProcessor("proc1", proc1, table1.name);
+        builder.build().addProcessor("proc1", proc1, table1.name);
 
         driver = new KStreamTestDriver(builder, stateDir, null, null);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index ec3ea0e..7453cc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -32,14 +32,14 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -51,9 +51,9 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
-import java.util.concurrent.CountDownLatch;
 import java.util.Properties;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -280,13 +280,13 @@ public class SimpleBenchmark {
 
 
     private KafkaStreams createCountStreams(Properties streamConfig, String topic, final CountDownLatch latch) {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Integer, byte[]> input = builder.stream(topic);
 
         input.groupByKey()
             .count("tmpStoreName").foreach(new CountDownAction(latch));
 
-        return new KafkaStreams(builder, streamConfig);
+        return new KafkaStreams(builder.build(), streamConfig);
     }
 
 
@@ -584,7 +584,7 @@ public class SimpleBenchmark {
     private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) {
         Properties props = setStreamProperties("simple-benchmark-streams");
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
 
@@ -623,7 +623,7 @@ public class SimpleBenchmark {
     private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch) {
         final Properties props = setStreamProperties("simple-benchmark-streams-with-sink");
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
 
@@ -682,7 +682,7 @@ public class SimpleBenchmark {
 
     private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties streamConfig, String kStreamTopic,
                                                              String kTableTopic, final CountDownLatch latch) {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic);
         final KTable<Long, byte[]> input2 = builder.table(kTableTopic, kTableTopic + "-store");
@@ -694,7 +694,7 @@ public class SimpleBenchmark {
 
     private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1,
                                                             String kTableTopic2, final CountDownLatch latch) {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final KTable<Long, byte[]> input1 = builder.table(kTableTopic1, kTableTopic1 + "-store");
         final KTable<Long, byte[]> input2 = builder.table(kTableTopic2, kTableTopic2 + "-store");
@@ -706,7 +706,7 @@ public class SimpleBenchmark {
 
     private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1,
                                                               String kStreamTopic2, final CountDownLatch latch) {
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic1);
         final KStream<Long, byte[]> input2 = builder.stream(kStreamTopic2);
@@ -722,7 +722,7 @@ public class SimpleBenchmark {
                                                           boolean enableCaching) {
         Properties props = setStreamProperties("simple-benchmark-streams-with-store" + enableCaching);
 
-        KStreamBuilder builder = new KStreamBuilder();
+        StreamsBuilder builder = new StreamsBuilder();
 
         if (enableCaching) {
             builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().enableCaching().build());
@@ -767,8 +767,8 @@ public class SimpleBenchmark {
         return createKafkaStreamsWithExceptionHandler(builder, props);
     }
 
-    private KafkaStreams createKafkaStreamsWithExceptionHandler(final KStreamBuilder builder, final Properties props) {
-        final KafkaStreams streamsClient = new KafkaStreams(builder, props);
+    private KafkaStreams createKafkaStreamsWithExceptionHandler(final StreamsBuilder builder, final Properties props) {
+        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
         streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index 56c578f..02e532d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -26,9 +26,9 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -36,13 +36,13 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.UUID;
-import java.util.List;
-import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 
 
@@ -280,7 +280,7 @@ public class YahooBenchmark {
         serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
         projectedEventDeserializer.configure(serdeProps, false);
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, ProjectedEvent> kEvents = builder.stream(Serdes.String(),
             Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), eventsTopic);
         final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(),
@@ -356,6 +356,6 @@ public class YahooBenchmark {
             .groupByKey(Serdes.String(), Serdes.String())
             .count(TimeWindows.of(10 * 1000), "time-windows");
 
-        return new KafkaStreams(builder, streamConfig);
+        return new KafkaStreams(builder.build(), streamConfig);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 07be6e8..a947218 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -40,9 +40,9 @@ import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.St
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class GlobalStreamThreadTest {

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 9ac2f99..4022a47 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -47,6 +47,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -328,9 +329,15 @@ public class StandbyTaskTest {
 
         restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
                 new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.stream("topic").groupByKey().count("my-store");
-        final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0);
+        final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+        builder.stream(null, null, null, null, "topic").groupByKey().count("my-store");
+
+        // TODO: we should refactor this to avoid usage of reflection
+        final Field internalTopologyBuilderField = builder.getClass().getDeclaredField("internalTopologyBuilder");
+        internalTopologyBuilderField.setAccessible(true);
+        final InternalTopologyBuilder internalTopologyBuilder = (InternalTopologyBuilder) internalTopologyBuilderField.get(builder);
+
+        final ProcessorTopology topology = internalTopologyBuilder.setApplicationId(applicationId).build(0);
         StreamsConfig config = createConfig(baseDir);
 
         new StandbyTask(taskId, applicationId, partitions, topology, consumer, changelogReader, config,

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 33e43e2..7b2956b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -28,12 +28,13 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
@@ -934,11 +935,13 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
+    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() throws Exception {
         final String applicationId = "application-id";
 
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId(applicationId);
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
+        internalTopologyBuilder.setApplicationId(applicationId);
 
         KStream<Object, Object> stream1 = builder
 
@@ -997,7 +1000,7 @@ public class StreamPartitionAssignorTest {
         final String client = "client1";
 
         final StreamThread streamThread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             mockClientSupplier,
             applicationId,
@@ -1005,7 +1008,7 @@ public class StreamPartitionAssignorTest {
             uuid,
             new Metrics(),
             Time.SYSTEM,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -1077,19 +1080,22 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
+        final String applicationId = "appId";
         final Properties props = configProps();
         props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         final StreamsConfig config = new StreamsConfig(props);
-        final KStreamBuilder builder = new KStreamBuilder();
-        final String applicationId = "appId";
-        builder.setApplicationId(applicationId);
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
+        internalTopologyBuilder.setApplicationId(applicationId);
+
         builder.stream("topic1").groupByKey().count("count");
 
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
         final StreamThread streamThread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             mockClientSupplier,
             applicationId,
@@ -1097,7 +1103,7 @@ public class StreamPartitionAssignorTest {
             uuid,
             new Metrics(),
             Time.SYSTEM,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 29edf6a..026d731 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -34,7 +34,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -93,7 +93,8 @@ public class StreamThreadTest {
     private final Metrics metrics = new Metrics();
     private final MockClientSupplier clientSupplier = new MockClientSupplier();
     private UUID processId = UUID.randomUUID();
-    final KStreamBuilder builder = new KStreamBuilder();
+    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+    private InternalTopologyBuilder internalTopologyBuilder;
     private final StreamsConfig config = new StreamsConfig(configProps(false));
     private final String stateDir = TestUtils.tempDirectory().getPath();
     private final StateDirectory stateDirectory  = new StateDirectory("applicationId", stateDir, mockTime);
@@ -102,7 +103,13 @@ public class StreamThreadTest {
     @Before
     public void setUp() throws Exception {
         processId = UUID.randomUUID();
-        builder.setApplicationId(applicationId);
+
+        // TODO: we should refactor this to avoid usage of reflection
+        final Field internalTopologyBuilderField = internalStreamsBuilder.getClass().getDeclaredField("internalTopologyBuilder");
+        internalTopologyBuilderField.setAccessible(true);
+        internalTopologyBuilder = (InternalTopologyBuilder) internalTopologyBuilderField.get(internalStreamsBuilder);
+
+        internalTopologyBuilder.setApplicationId(applicationId);
     }
 
     private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
@@ -258,7 +265,7 @@ public class StreamThreadTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testPartitionAssignmentChangeForSingleGroup() throws Exception {
-        builder.addSource("source1", "topic1");
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1");
 
         final StreamThread thread = getStreamThread();
 
@@ -352,10 +359,10 @@ public class StreamThreadTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testPartitionAssignmentChangeForMultipleGroups() throws Exception {
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addSource("source3", "topic3");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1");
+        internalTopologyBuilder.addSource(null, "source2", null, null, null, "topic2");
+        internalTopologyBuilder.addSource(null, "source3", null, null, null, "topic3");
+        internalTopologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
         final StreamThread thread = getStreamThread();
 
@@ -448,7 +455,7 @@ public class StreamThreadTest {
     public void testStateChangeStartClose() throws InterruptedException {
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -457,7 +464,7 @@ public class StreamThreadTest {
             metrics,
             Time.SYSTEM,
 
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -489,7 +496,7 @@ public class StreamThreadTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testHandingOverTaskFromOneToAnotherThread() throws InterruptedException {
-        builder.addStateStore(
+        internalTopologyBuilder.addStateStore(
             Stores
                 .create("store")
                 .withByteArrayKeys()
@@ -497,12 +504,12 @@ public class StreamThreadTest {
                 .persistent()
                 .build()
         );
-        builder.addSource("source", TOPIC);
+        internalTopologyBuilder.addSource(null, "source", null, null, null, TOPIC);
 
         //clientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1)));
 
         final StreamThread thread1 = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -510,11 +517,11 @@ public class StreamThreadTest {
             processId,
             metrics,
             Time.SYSTEM,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
         final StreamThread thread2 = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -522,7 +529,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             Time.SYSTEM,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -627,7 +634,7 @@ public class StreamThreadTest {
     @Test
     public void testMetrics() {
         final StreamThread thread = new StreamThread(
-                builder.internalTopologyBuilder,
+                internalTopologyBuilder,
                 config,
                 clientSupplier,
                 applicationId,
@@ -635,7 +642,7 @@ public class StreamThreadTest {
                 processId,
                 metrics,
                 mockTime,
-                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+                new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
                 0, stateDirectory);
         final String defaultGroupName = "stream-metrics";
         final String defaultPrefix = "thread." + thread.threadClientId();
@@ -678,10 +685,10 @@ public class StreamThreadTest {
 
             final StreamsConfig config = new StreamsConfig(props);
 
-            builder.addSource("source1", "topic1");
+            internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1");
 
             final StreamThread thread = new StreamThread(
-                    builder.internalTopologyBuilder,
+                    internalTopologyBuilder,
                     config,
                     clientSupplier,
                     applicationId,
@@ -689,7 +696,7 @@ public class StreamThreadTest {
                     processId,
                     metrics,
                     mockTime,
-                    new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+                    new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
                     0, stateDirectory) {
 
                 @Override
@@ -770,10 +777,10 @@ public class StreamThreadTest {
 
     @Test
     public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() throws InterruptedException {
-        builder.addSource("source1", "someTopic");
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic");
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -781,7 +788,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -806,11 +813,11 @@ public class StreamThreadTest {
 
     @Test
     public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() throws InterruptedException {
-        builder.addSource("source1", "someTopic");
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic");
 
         final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             new StreamsConfig(configProps(true)),
             clientSupplier,
             applicationId,
@@ -818,7 +825,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -846,11 +853,11 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException {
-        builder.addSource("source1", "someTopic");
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic");
 
         final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             new StreamsConfig(configProps(true)),
             clientSupplier,
             applicationId,
@@ -858,7 +865,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -881,10 +888,10 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseThreadProducerOnCloseIfEosDisabled() throws InterruptedException {
-        builder.addSource("source1", "someTopic");
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic");
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -892,7 +899,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -913,10 +920,10 @@ public class StreamThreadTest {
 
     @Test
     public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws InterruptedException {
-        builder.addSource("name", "topic").addSink("out", "output");
+        internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
+        internalTopologyBuilder.addSink("out", "output", null, null, null);
 
-        final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+        final StreamThread thread = new StreamThread(internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -924,7 +931,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -942,13 +949,14 @@ public class StreamThreadTest {
 
     @Test
     public void shouldNotCloseSuspendedTaskswice() throws Exception {
-        builder.addSource("name", "topic").addSink("out", "output");
+        internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
+        internalTopologyBuilder.addSink("out", "output", null, null, null);
 
         final TestStreamTask testStreamTask = new TestStreamTask(
                 new TaskId(0, 0),
                 applicationId,
                 Utils.mkSet(new TopicPartition("topic", 0)),
-                builder.build(0),
+                internalTopologyBuilder.build(0),
                 clientSupplier.consumer,
                 clientSupplier.getProducer(new HashMap<String, Object>()),
                 clientSupplier.restoreConsumer,
@@ -957,7 +965,7 @@ public class StreamThreadTest {
                 new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime));
 
         final StreamThread thread = new StreamThread(
-                builder.internalTopologyBuilder,
+                internalTopologyBuilder,
                 config,
                 clientSupplier,
                 applicationId,
@@ -965,7 +973,7 @@ public class StreamThreadTest {
                 processId,
                 metrics,
                 mockTime,
-                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+                new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
                 0,
                 stateDirectory) {
 
@@ -1005,11 +1013,11 @@ public class StreamThreadTest {
 
     @Test
     public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks()  {
-        builder.stream("t1").groupByKey().count("count-one");
-        builder.stream("t2").groupByKey().count("count-two");
+        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
+        internalStreamsBuilder.stream(null, null, null, null, "t2").groupByKey().count("count-two");
 
         final StreamThread thread = new StreamThread(
-                builder.internalTopologyBuilder,
+                internalTopologyBuilder,
                 config,
                 clientSupplier,
                 applicationId,
@@ -1017,7 +1025,7 @@ public class StreamThreadTest {
                 processId,
                 metrics,
                 mockTime,
-                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+                new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
                 0,
                 stateDirectory);
 
@@ -1063,11 +1071,11 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
-        builder.stream("t1").groupByKey().count("count-one");
-        builder.stream("t2").groupByKey().count("count-two");
+        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
+        internalStreamsBuilder.stream(null, null, null, null, "t2").groupByKey().count("count-two");
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1075,7 +1083,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
@@ -1135,12 +1143,12 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception {
-        builder.stream(Pattern.compile("t.*")).to("out");
+        internalStreamsBuilder.stream(null, null, null, null, Pattern.compile("t.*")).to("out");
 
         final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>();
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1148,7 +1156,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory) {
 
@@ -1190,7 +1198,7 @@ public class StreamThreadTest {
         updatedTopicsField.setAccessible(true);
         final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
         updatedTopics.add(t1.topic());
-        builder.updateSubscriptions(subscriptionUpdates, null);
+        internalTopologyBuilder.updateSubscriptions(subscriptionUpdates, null);
 
         // should create task for id 0_0 with a single partition
         thread.setState(StreamThread.State.RUNNING);
@@ -1217,11 +1225,12 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
-        builder.addSource("source", TOPIC).addSink("sink", "dummyTopic", "source");
+        internalTopologyBuilder.addSource(null, "source", null, null, null, TOPIC);
+        internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source");
 
         final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             new StreamsConfig(configProps(true)),
             clientSupplier,
             applicationId,
@@ -1229,7 +1238,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -1314,11 +1323,12 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() throws Exception {
-        builder.addSource("name", "topic").addSink("out", "output");
+        internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
+        internalTopologyBuilder.addSink("out", "output", null, null, null);
 
         final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             new StreamsConfig(configProps(true)),
             clientSupplier,
             applicationId,
@@ -1326,7 +1336,7 @@ public class StreamThreadTest {
             processId,
             new Metrics(),
             new MockTime(),
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -1352,13 +1362,13 @@ public class StreamThreadTest {
 
     @Test
     public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception {
-        builder.stream("t1").groupByKey();
+        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey();
 
         final TestStreamTask testStreamTask = new TestStreamTask(
             new TaskId(0, 0),
             applicationId,
             Utils.mkSet(new TopicPartition("t1", 0)),
-            builder.build(0),
+            internalTopologyBuilder.build(0),
             clientSupplier.consumer,
             clientSupplier.getProducer(new HashMap<String, Object>()),
             clientSupplier.restoreConsumer,
@@ -1373,7 +1383,7 @@ public class StreamThreadTest {
         };
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1381,7 +1391,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory) {
 
@@ -1408,12 +1418,12 @@ public class StreamThreadTest {
     @Test
     public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown() throws Exception {
         final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("foo", false);
-        builder.stream("t1").groupByKey().count(new MockStateStoreSupplier(stateStore));
+        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count(new MockStateStoreSupplier(stateStore));
         final TestStreamTask testStreamTask = new TestStreamTask(
             new TaskId(0, 0),
             applicationId,
             Utils.mkSet(new TopicPartition("t1", 0)),
-            builder.build(0),
+            internalTopologyBuilder.build(0),
             clientSupplier.consumer,
             clientSupplier.getProducer(new HashMap<String, Object>()),
             clientSupplier.restoreConsumer,
@@ -1428,7 +1438,7 @@ public class StreamThreadTest {
         };
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1436,7 +1446,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory) {
 
@@ -1474,13 +1484,13 @@ public class StreamThreadTest {
 
     @Test
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() throws Exception {
-        builder.stream("t1").groupByKey();
+        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey();
 
         final TestStreamTask testStreamTask = new TestStreamTask(
             new TaskId(0, 0),
             applicationId,
             Utils.mkSet(new TopicPartition("t1", 0)),
-            builder.build(0),
+            internalTopologyBuilder.build(0),
             clientSupplier.consumer,
             clientSupplier.getProducer(new HashMap<String, Object>()),
             clientSupplier.restoreConsumer,
@@ -1495,7 +1505,7 @@ public class StreamThreadTest {
         };
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1503,7 +1513,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory) {
 
@@ -1534,13 +1544,13 @@ public class StreamThreadTest {
 
     @Test
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhileSuspendingState() throws Exception {
-        builder.stream("t1").groupByKey();
+        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey();
 
         final TestStreamTask testStreamTask = new TestStreamTask(
             new TaskId(0, 0),
             applicationId,
             Utils.mkSet(new TopicPartition("t1", 0)),
-            builder.build(0),
+            internalTopologyBuilder.build(0),
             clientSupplier.consumer,
             clientSupplier.getProducer(new HashMap<String, Object>()),
             clientSupplier.restoreConsumer,
@@ -1555,7 +1565,7 @@ public class StreamThreadTest {
         };
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1563,7 +1573,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0, stateDirectory) {
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0, stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -1594,11 +1604,11 @@ public class StreamThreadTest {
     @Test
     @SuppressWarnings("unchecked")
     public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception {
-        builder.addSource("source", Pattern.compile("t.*"));
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        internalTopologyBuilder.addSource(null, "source", null, null, null, Pattern.compile("t.*"));
+        internalTopologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source");
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1606,7 +1616,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory);
 
@@ -1619,13 +1629,12 @@ public class StreamThreadTest {
 
         thread.setPartitionAssignor(partitionAssignor);
 
-        final Field
-            nodeToSourceTopicsField =
-            builder.internalTopologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
+        final Field nodeToSourceTopicsField =
+            internalTopologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
         nodeToSourceTopicsField.setAccessible(true);
         final Map<String, List<String>>
             nodeToSourceTopics =
-            (Map<String, List<String>>) nodeToSourceTopicsField.get(builder.internalTopologyBuilder);
+            (Map<String, List<String>>) nodeToSourceTopicsField.get(internalTopologyBuilder);
         final List<TopicPartition> topicPartitions = new ArrayList<>();
 
         final TopicPartition topicPartition1 = new TopicPartition("topic-1", 0);
@@ -1833,7 +1842,7 @@ public class StreamThreadTest {
         final String storeName = "store";
         final String changelogTopic = applicationId + "-" + storeName + "-changelog";
 
-        builder.stream("topic1").groupByKey().count(storeName);
+        internalStreamsBuilder.stream(null, null, null, null, "topic1").groupByKey().count(storeName);
 
         final MockClientSupplier clientSupplier = new MockClientSupplier();
         clientSupplier.restoreConsumer.updatePartitions(changelogTopic,
@@ -1850,7 +1859,7 @@ public class StreamThreadTest {
         });
 
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1858,7 +1867,7 @@ public class StreamThreadTest {
             processId,
             new Metrics(),
             new MockTime(),
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory) {
 
@@ -1933,7 +1942,7 @@ public class StreamThreadTest {
 
     private StreamThread getStreamThread() {
         return new StreamThread(
-            builder.internalTopologyBuilder,
+            internalTopologyBuilder,
             config,
             clientSupplier,
             applicationId,
@@ -1941,7 +1950,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             Time.SYSTEM,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
             0,
             stateDirectory) {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 8ee1d6e..d394abe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -23,10 +23,11 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.StreamsMetadata;
@@ -56,7 +57,7 @@ public class StreamsMetadataStateTest {
     private TopicPartition topic2P0;
     private TopicPartition topic3P0;
     private Map<HostInfo, Set<TopicPartition>> hostToPartitions;
-    private KStreamBuilder builder;
+    private StreamsBuilder builder;
     private TopicPartition topic1P1;
     private TopicPartition topic2P1;
     private TopicPartition topic4P0;
@@ -66,8 +67,8 @@ public class StreamsMetadataStateTest {
     private StreamPartitioner<String, Object> partitioner;
 
     @Before
-    public void before() {
-        builder = new KStreamBuilder();
+    public void before() throws Exception {
+        builder = new StreamsBuilder();
         final KStream<Object, Object> one = builder.stream("topic-one");
         one.groupByKey().count("table-one");
 
@@ -89,7 +90,7 @@ public class StreamsMetadataStateTest {
 
         builder.globalTable("global-topic", "global-table");
 
-        builder.setApplicationId("appId");
+        InternalStreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
 
         topic1P0 = new TopicPartition("topic-one", 0);
         topic1P1 = new TopicPartition("topic-one", 1);
@@ -115,7 +116,7 @@ public class StreamsMetadataStateTest {
                 new PartitionInfo("topic-four", 0, null, null, null));
 
         cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
-        discovery = new StreamsMetadataState(builder.internalTopologyBuilder, hostOne);
+        discovery = new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder), hostOne);
         discovery.onChange(hostToPartitions, cluster);
         partitioner = new StreamPartitioner<String, Object>() {
             @Override
@@ -127,7 +128,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldNotThrowNPEWhenOnChangeNotCalled() throws Exception {
-        new StreamsMetadataState(builder.internalTopologyBuilder, hostOne).getAllMetadataForStore("store");
+        new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder), hostOne).getAllMetadataForStore("store");
     }
 
     @Test
@@ -294,7 +295,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() throws Exception {
-        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder), StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()));
     }
@@ -307,7 +308,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() throws Exception {
-        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder), StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index c01e169..d7078f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -28,8 +28,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
@@ -70,10 +70,10 @@ public class BrokerCompatibilityTest {
         streamsProperties.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout + 1);
 
 
-        final KStreamBuilder builder = new KStreamBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         builder.stream(SOURCE_TOPIC).to(SINK_TOPIC);
 
-        final KafkaStreams streams = new KafkaStreams(builder, streamsProperties);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {