You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/28 23:46:38 UTC

[1/3] kafka git commit: KAFKA-5670: (KIP-120) Add Topology and deprecate TopologyBuilder

Repository: kafka
Updated Branches:
  refs/heads/trunk c50c941af -> 1844bf2b2


http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
new file mode 100644
index 0000000..c163935
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+public class TopologyTest {
+
+    private final Topology topology = new Topology();
+    private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingSourceWithTopic() {
+        topology.addSource((String) null, "topic");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingSourceWithPattern() {
+        topology.addSource(null, Pattern.compile(".*"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicsWhenAddingSoureWithTopic() {
+        topology.addSource("source", (String[]) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicsWhenAddingSourceWithPattern() {
+        topology.addSource("source", (Pattern) null);
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldNotAllowZeroTopicsWhenAddingSource() {
+        topology.addSource("source");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingProcessor() {
+        topology.addProcessor(null, new ProcessorSupplier() {
+            @Override
+            public Processor get() {
+                return new MockProcessorSupplier().get();
+            }
+        });
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
+        topology.addProcessor("name", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingSink() {
+        topology.addSink(null, "topic");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicWhenAddingSink() {
+        topology.addSink("name", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
+        topology.connectProcessorAndStateStores(null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStoreNameWhenConnectingProcessorAndStateStores() {
+        topology.connectProcessorAndStateStores("processor", (String[]) null);
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldNotAllowZeroStoreNameWhenConnectingProcessorAndStateStores() {
+        topology.connectProcessorAndStateStores("processor");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAddNullStateStoreSupplier() {
+        topology.addStateStore(null);
+    }
+
+    @Test
+    public void shouldNotAllowToAddSourcesWithSameName() {
+        topology.addSource("source", "topic-1");
+        try {
+            topology.addSource("source", "topic-2");
+            fail("Should throw TopologyException for duplicate source name");
+        } catch (TopologyException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowToAddTopicTwice() {
+        topology.addSource("source", "topic-1");
+        try {
+            topology.addSource("source-2", "topic-1");
+            fail("Should throw TopologyException for already used topic");
+        } catch (TopologyException expected) { }
+    }
+
+    @Test
+    public void testPatternMatchesAlreadyProvidedTopicSource() {
+        topology.addSource("source-1", "foo");
+        try {
+            topology.addSource("source-2", Pattern.compile("f.*"));
+            fail("Should have thrown TopologyException for overlapping pattern with already registered topic");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test
+    public void testNamedTopicMatchesAlreadyProvidedPattern() {
+        topology.addSource("source-1", Pattern.compile("f.*"));
+        try {
+            topology.addSource("source-2", "foo");
+            fail("Should have thrown TopologyException for overlapping topic with already registered pattern");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test
+    public void shoudNotAllowToAddProcessorWithSameName() {
+        topology.addSource("source", "topic-1");
+        topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+        try {
+            topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+            fail("Should throw TopologyException for duplicate processor name");
+        } catch (TopologyException expected) { }
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldFailOnUnknownSource() {
+        topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldFailIfNodeIsItsOwnParent() {
+        topology.addProcessor("processor", new MockProcessorSupplier(), "processor");
+    }
+
+    @Test
+    public void shouldNotAllowToAddSinkWithSameName() {
+        topology.addSource("source", "topic-1");
+        topology.addSink("sink", "topic-2", "source");
+        try {
+            topology.addSink("sink", "topic-3", "source");
+            fail("Should throw TopologyException for duplicate sink name");
+        } catch (TopologyException expected) { }
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldFailWithUnknownParent() {
+        topology.addSink("sink", "topic-2", "source");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldFailIfSinkIsItsOwnParent() {
+        topology.addSink("sink", "topic-2", "sink");
+    }
+
+    @Test
+    public void shouldFailIfSinkIsParent() {
+        topology.addSource("source", "topic-1");
+        topology.addSink("sink-1", "topic-2", "source");
+        try {
+            topology.addSink("sink-2", "topic-3", "sink-1");
+            fail("Should throw TopologyException for using sink as parent");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
+        topology.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
+    }
+
+    @Test
+    public void shouldNotAllowToAddStateStoreToSource() {
+        topology.addSource("source-1", "topic-1");
+        try {
+            topology.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
+            fail("Should have thrown TopologyException for adding store to source node");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowToAddStateStoreToSink() {
+        topology.addSink("sink-1", "topic-1");
+        try {
+            topology.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
+            fail("Should have thrown TopologyException for adding store to sink node");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowToAddStoreWithSameName() {
+        topology.addStateStore(new MockStateStoreSupplier("store", false));
+        try {
+            topology.addStateStore(new MockStateStoreSupplier("store", false));
+            fail("Should have thrown TopologyException for duplicate store name");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
+        final String sourceNodeName = "source";
+        final String goodNodeName = "goodGuy";
+        final String badNodeName = "badGuy";
+
+        final Properties config = new Properties();
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        final StreamsConfig streamsConfig = new StreamsConfig(config);
+
+        topology
+            .addSource(sourceNodeName, "topic")
+            .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
+            .addStateStore(
+                Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+                goodNodeName)
+            .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+
+        try {
+            new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder);
+        } catch (final StreamsException e) {
+            final Throwable cause = e.getCause();
+            if (cause != null
+                && cause instanceof TopologyBuilderException
+                && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
+                throw (TopologyBuilderException) cause;
+            } else {
+                throw new RuntimeException("Did expect different exception. Did catch:", e);
+            }
+        }
+    }
+
+    private static class LocalMockProcessorSupplier implements ProcessorSupplier {
+        final static String STORE_NAME = "store";
+
+        @Override
+        public Processor get() {
+            return new Processor() {
+                @Override
+                public void init(ProcessorContext context) {
+                    context.getStateStore(STORE_NAME);
+                }
+
+                @Override
+                public void process(Object key, Object value) { }
+
+                @Override
+                public void punctuate(long timestamp) { }
+
+                @Override
+                public void close() { }
+            };
+        }
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
+        topology.addGlobalStore(
+            new MockStateStoreSupplier("anyName", false, false),
+            "sameName",
+            null,
+            null,
+            "anyTopicName",
+            "sameName",
+            new MockProcessorSupplier());
+    }
+
+    @Test
+    public void shouldDescribeEmptyTopology() {
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void singleSourceShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3");
+
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void singleSourcePatternShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]"));
+
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void multipleSourcesShouldHaveDistinctSubtopologies() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(1,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        expectedDescription.addSubtopology(
+            new InternalTopologyBuilder.Subtopology(2,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void sourceAndProcessorShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final String[] store = new String[] {"store"};
+        final TopologyDescription.Processor expectedProcessorNode
+            = addProcessorWithNewStore("processor", store, expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+
+    @Test
+    public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final String[] stores = new String[] {"store1", "store2"};
+        final TopologyDescription.Processor expectedProcessorNode
+            = addProcessorWithNewStore("processor", stores, expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode);
+        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode1);
+        allNodes.add(expectedProcessorNode2);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic0");
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", Pattern.compile("topic[1-9]"));
+        final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode1, expectedSourceNode2);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode1);
+        allNodes.add(expectedSourceNode2);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
+
+        final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
+        allNodes1.add(expectedSourceNode1);
+        allNodes1.add(expectedProcessorNode1);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
+
+        final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
+        allNodes2.add(expectedSourceNode2);
+        allNodes2.add(expectedProcessorNode2);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
+
+        final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
+        allNodes3.add(expectedSourceNode3);
+        allNodes3.add(expectedProcessorNode3);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+        final TopologyDescription.Sink expectedSinkNode1 = addSink("sink1", "sinkTopic1", expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Sink expectedSinkNode2 = addSink("sink2", "sinkTopic2", expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Sink expectedSinkNode3 = addSink("sink3", "sinkTopic3", expectedSourceNode3);
+
+        final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
+        allNodes1.add(expectedSourceNode1);
+        allNodes1.add(expectedSinkNode1);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
+
+        final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
+        allNodes2.add(expectedSourceNode2);
+        allNodes2.add(expectedSinkNode2);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
+
+        final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
+        allNodes3.add(expectedSourceNode3);
+        allNodes3.add(expectedSinkNode3);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void processorsWithSameSinkShouldHaveSameSubtopology() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
+
+        final TopologyDescription.Sink expectedSinkNode = addSink(
+            "sink",
+            "sinkTopic",
+            expectedProcessorNode1,
+            expectedProcessorNode2,
+            expectedProcessorNode3);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode1);
+        allNodes.add(expectedProcessorNode1);
+        allNodes.add(expectedSourceNode2);
+        allNodes.add(expectedProcessorNode2);
+        allNodes.add(expectedSourceNode3);
+        allNodes.add(expectedProcessorNode3);
+        allNodes.add(expectedSinkNode);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void processorsWithSharedStateShouldHaveSameSubtopology() {
+        final String[] store1 = new String[] {"store1"};
+        final String[] store2 = new String[] {"store2"};
+        final String[] bothStores = new String[] {store1[0], store2[0]};
+
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode1
+            = addProcessorWithNewStore("processor1", store1, expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Processor expectedProcessorNode2
+            = addProcessorWithNewStore("processor2", store2, expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Processor expectedProcessorNode3
+            = addProcessorWithExistingStore("processor3", bothStores, expectedSourceNode3);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode1);
+        allNodes.add(expectedProcessorNode1);
+        allNodes.add(expectedSourceNode2);
+        allNodes.add(expectedProcessorNode2);
+        allNodes.add(expectedSourceNode3);
+        allNodes.add(expectedProcessorNode3);
+        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void shouldDescribeGlobalStoreTopology() {
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor");
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    @Test
+    public void shouldDescribeMultipleGlobalStoreTopology() {
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1");
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2");
+        assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+    }
+
+    private TopologyDescription.Source addSource(final String sourceName,
+                                                 final String... sourceTopic) {
+        topology.addSource(null, sourceName, null, null, null, sourceTopic);
+        String allSourceTopics = sourceTopic[0];
+        for (int i = 1; i < sourceTopic.length; ++i) {
+            allSourceTopics += ", " + sourceTopic[i];
+        }
+        return new InternalTopologyBuilder.Source(sourceName, allSourceTopics);
+    }
+
+    private TopologyDescription.Source addSource(final String sourceName,
+                                                 final Pattern sourcePattern) {
+        topology.addSource(null, sourceName, null, null, null, sourcePattern);
+        return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString());
+    }
+
+    private TopologyDescription.Processor addProcessor(final String processorName,
+                                                       final TopologyDescription.Node... parents) {
+        return addProcessorWithNewStore(processorName, new String[0], parents);
+    }
+
+    private TopologyDescription.Processor addProcessorWithNewStore(final String processorName,
+                                                                   final String[] storeNames,
+                                                                   final TopologyDescription.Node... parents) {
+        return addProcessorWithStore(processorName, storeNames, true, parents);
+    }
+
+    private TopologyDescription.Processor addProcessorWithExistingStore(final String processorName,
+                                                                        final String[] storeNames,
+                                                                        final TopologyDescription.Node... parents) {
+        return addProcessorWithStore(processorName, storeNames, false, parents);
+    }
+
+    private TopologyDescription.Processor addProcessorWithStore(final String processorName,
+                                                                final String[] storeNames,
+                                                                final boolean newStores,
+                                                                final TopologyDescription.Node... parents) {
+        final String[] parentNames = new String[parents.length];
+        for (int i = 0; i < parents.length; ++i) {
+            parentNames[i] = parents[i].name();
+        }
+
+        topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
+        if (newStores) {
+            for (final String store : storeNames) {
+                topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
+            }
+        } else {
+            topology.connectProcessorAndStateStores(processorName, storeNames);
+        }
+        final TopologyDescription.Processor expectedProcessorNode
+            = new InternalTopologyBuilder.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
+
+        for (final TopologyDescription.Node parent : parents) {
+            ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedProcessorNode);
+            ((InternalTopologyBuilder.AbstractNode) expectedProcessorNode).addPredecessor(parent);
+        }
+
+        return expectedProcessorNode;
+    }
+
+    private TopologyDescription.Sink addSink(final String sinkName,
+                                             final String sinkTopic,
+                                             final TopologyDescription.Node... parents) {
+        final String[] parentNames = new String[parents.length];
+        for (int i = 0; i < parents.length; ++i) {
+            parentNames[i] = parents[i].name();
+        }
+
+        topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
+        final TopologyDescription.Sink expectedSinkNode
+            = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
+
+        for (final TopologyDescription.Node parent : parents) {
+            ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedSinkNode);
+            ((InternalTopologyBuilder.AbstractNode) expectedSinkNode).addPredecessor(parent);
+        }
+
+        return expectedSinkNode;
+    }
+
+    private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
+                                                                final String sourceName,
+                                                                final String globalTopicName,
+                                                                final String processorName) {
+        topology.addGlobalStore(
+            new MockStateStoreSupplier(globalStoreName, false, false),
+            sourceName,
+            null,
+            null,
+            null,
+            globalTopicName,
+            processorName,
+            new MockProcessorSupplier());
+
+        final TopologyDescription.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(
+            sourceName,
+            processorName,
+            globalStoreName,
+            globalTopicName);
+
+        expectedDescription.addGlobalStore(expectedGlobalStore);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index a868839..3fecfc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
@@ -56,6 +57,7 @@ import java.util.regex.Pattern;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 @Category({IntegrationTest.class})
 public class KStreamsFineGrainedAutoResetIntegrationTest {
@@ -238,26 +240,29 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
         consumer.close();
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThrowExceptionOverlappingPattern() throws  Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         //NOTE this would realistically get caught when building topology, the test is for completeness
         builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
         builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));
-        builder.stream(TOPIC_Y_1, TOPIC_Z_1);
 
-        builder.earliestResetTopicsPattern();
+        try {
+            builder.earliestResetTopicsPattern();
+            fail("Should have thrown TopologyException");
+        } catch (final TopologyException expected) { }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThrowExceptionOverlappingTopic() throws  Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         //NOTE this would realistically get caught when building topology, the test is for completeness
         builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
-        builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d_1"));
-        builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
 
-        builder.latestResetTopicsPattern();
+        try {
+            builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
+            fail("Should have thrown TopologyBuilderException");
+        } catch (final TopologyBuilderException expected) { }
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index a7ddb7b..0cd2f2a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -57,9 +57,9 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")
 public class TopologyBuilderTest {
 
-
     @Test
     public void shouldAddSourceWithOffsetReset() {
         final TopologyBuilder builder = new TopologyBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
deleted file mode 100644
index a541eb3..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.streams.TopologyDescription;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-// TODO (remove this comment) Test name ok, we just use InternalTopologyBuilder for now in this test until Topology gets added
-public class TopologyTest {
-    // TODO change from InternalTopologyBuilder to Topology
-    private final InternalTopologyBuilder topology = new InternalTopologyBuilder();
-    private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
-
-    @Test
-    public void shouldDescribeEmptyTopology() {
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void singleSourceShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3");
-
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void singleSourcePatternShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]"));
-
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void multipleSourcesShouldHaveDistinctSubtopologies() {
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(0,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
-
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(1,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
-
-        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        expectedDescription.addSubtopology(
-            new InternalTopologyBuilder.Subtopology(2,
-                Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void sourceAndProcessorShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-        final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode);
-        allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-        final String[] store = new String[] {"store"};
-        final TopologyDescription.Processor expectedProcessorNode
-            = addProcessorWithNewStore("processor", store, expectedSourceNode);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode);
-        allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-
-    @Test
-    public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-        final String[] stores = new String[] {"store1", "store2"};
-        final TopologyDescription.Processor expectedProcessorNode
-            = addProcessorWithNewStore("processor", stores, expectedSourceNode);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode);
-        allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode);
-        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode);
-        allNodes.add(expectedProcessorNode1);
-        allNodes.add(expectedProcessorNode2);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic0");
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", Pattern.compile("topic[1-9]"));
-        final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode1, expectedSourceNode2);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode1);
-        allNodes.add(expectedSourceNode2);
-        allNodes.add(expectedProcessorNode);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
-        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
-
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
-
-        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
-
-        final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
-        allNodes1.add(expectedSourceNode1);
-        allNodes1.add(expectedProcessorNode1);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
-
-        final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
-        allNodes2.add(expectedSourceNode2);
-        allNodes2.add(expectedProcessorNode2);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
-
-        final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
-        allNodes3.add(expectedSourceNode3);
-        allNodes3.add(expectedProcessorNode3);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
-        final TopologyDescription.Sink expectedSinkNode1 = addSink("sink1", "sinkTopic1", expectedSourceNode1);
-
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        final TopologyDescription.Sink expectedSinkNode2 = addSink("sink2", "sinkTopic2", expectedSourceNode2);
-
-        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        final TopologyDescription.Sink expectedSinkNode3 = addSink("sink3", "sinkTopic3", expectedSourceNode3);
-
-        final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
-        allNodes1.add(expectedSourceNode1);
-        allNodes1.add(expectedSinkNode1);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
-
-        final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
-        allNodes2.add(expectedSourceNode2);
-        allNodes2.add(expectedSinkNode2);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
-
-        final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
-        allNodes3.add(expectedSourceNode3);
-        allNodes3.add(expectedSinkNode3);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void processorsWithSameSinkShouldHaveSameSubtopology() {
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
-        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
-
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
-
-        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
-
-        final TopologyDescription.Sink expectedSinkNode = addSink(
-            "sink",
-            "sinkTopic",
-            expectedProcessorNode1,
-            expectedProcessorNode2,
-            expectedProcessorNode3);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode1);
-        allNodes.add(expectedProcessorNode1);
-        allNodes.add(expectedSourceNode2);
-        allNodes.add(expectedProcessorNode2);
-        allNodes.add(expectedSourceNode3);
-        allNodes.add(expectedProcessorNode3);
-        allNodes.add(expectedSinkNode);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void processorsWithSharedStateShouldHaveSameSubtopology() {
-        final String[] store1 = new String[] {"store1"};
-        final String[] store2 = new String[] {"store2"};
-        final String[] bothStores = new String[] {store1[0], store2[0]};
-
-        final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
-        final TopologyDescription.Processor expectedProcessorNode1
-            = addProcessorWithNewStore("processor1", store1, expectedSourceNode1);
-
-        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
-        final TopologyDescription.Processor expectedProcessorNode2
-            = addProcessorWithNewStore("processor2", store2, expectedSourceNode2);
-
-        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
-        final TopologyDescription.Processor expectedProcessorNode3
-            = addProcessorWithExistingStore("processor3", bothStores, expectedSourceNode3);
-
-        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
-        allNodes.add(expectedSourceNode1);
-        allNodes.add(expectedProcessorNode1);
-        allNodes.add(expectedSourceNode2);
-        allNodes.add(expectedProcessorNode2);
-        allNodes.add(expectedSourceNode3);
-        allNodes.add(expectedProcessorNode3);
-        expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void shouldDescribeGlobalStoreTopology() {
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor");
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    @Test
-    public void shouldDescribeMultipleGlobalStoreTopology() {
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1");
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2");
-        assertThat(topology.describe(), equalTo(expectedDescription));
-    }
-
-    private TopologyDescription.Source addSource(final String sourceName,
-                                                 final String... sourceTopic) {
-        topology.addSource(null, sourceName, null, null, null, sourceTopic);
-        String allSourceTopics = sourceTopic[0];
-        for (int i = 1; i < sourceTopic.length; ++i) {
-            allSourceTopics += ", " + sourceTopic[i];
-        }
-        return new InternalTopologyBuilder.Source(sourceName, allSourceTopics);
-    }
-
-    private TopologyDescription.Source addSource(final String sourceName,
-                                                 final Pattern sourcePattern) {
-        topology.addSource(null, sourceName, null, null, null, sourcePattern);
-        return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString());
-    }
-
-    private TopologyDescription.Processor addProcessor(final String processorName,
-                                                       final TopologyDescription.Node... parents) {
-        return addProcessorWithNewStore(processorName, new String[0], parents);
-    }
-
-    private TopologyDescription.Processor addProcessorWithNewStore(final String processorName,
-                                                                   final String[] storeNames,
-                                                                   final TopologyDescription.Node... parents) {
-        return addProcessorWithStore(processorName, storeNames, true, parents);
-    }
-
-    private TopologyDescription.Processor addProcessorWithExistingStore(final String processorName,
-                                                                        final String[] storeNames,
-                                                                        final TopologyDescription.Node... parents) {
-        return addProcessorWithStore(processorName, storeNames, false, parents);
-    }
-
-    private TopologyDescription.Processor addProcessorWithStore(final String processorName,
-                                                                final String[] storeNames,
-                                                                final boolean newStores,
-                                                                final TopologyDescription.Node... parents) {
-        final String[] parentNames = new String[parents.length];
-        for (int i = 0; i < parents.length; ++i) {
-            parentNames[i] = parents[i].name();
-        }
-
-        topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
-        if (newStores) {
-            for (final String store : storeNames) {
-                topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
-            }
-        } else {
-            topology.connectProcessorAndStateStores(processorName, storeNames);
-        }
-        final TopologyDescription.Processor expectedProcessorNode
-            = new InternalTopologyBuilder.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
-
-        for (final TopologyDescription.Node parent : parents) {
-            ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedProcessorNode);
-            ((InternalTopologyBuilder.AbstractNode) expectedProcessorNode).addPredecessor(parent);
-        }
-
-        return expectedProcessorNode;
-    }
-
-    private TopologyDescription.Sink addSink(final String sinkName,
-                                             final String sinkTopic,
-                                             final TopologyDescription.Node... parents) {
-        final String[] parentNames = new String[parents.length];
-        for (int i = 0; i < parents.length; ++i) {
-            parentNames[i] = parents[i].name();
-        }
-
-        topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
-        final TopologyDescription.Sink expectedSinkNode
-            = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
-
-        for (final TopologyDescription.Node parent : parents) {
-            ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedSinkNode);
-            ((InternalTopologyBuilder.AbstractNode) expectedSinkNode).addPredecessor(parent);
-        }
-
-        return expectedSinkNode;
-    }
-
-    private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
-                                                                final String sourceName,
-                                                                final String globalTopicName,
-                                                                final String processorName) {
-        topology.addGlobalStore(
-            new MockStateStoreSupplier(globalStoreName, false, false),
-            sourceName,
-            null,
-            null,
-            null,
-            globalTopicName,
-            processorName,
-            new MockProcessorSupplier());
-
-        final TopologyDescription.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(
-            sourceName,
-            processorName,
-            globalStoreName,
-            globalTopicName);
-
-        expectedDescription.addGlobalStore(expectedGlobalStore);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index b98b756..9bd8756 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -19,14 +19,15 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
@@ -67,8 +68,8 @@ public class InternalTopologyBuilderTest {
         final String earliestTopic = "earliestTopic";
         final String latestTopic = "latestTopic";
 
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, null, null, earliestTopic);
-        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", null, null, null, latestTopic);
+        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, null, null, earliestTopic);
+        builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, null, null, latestTopic);
 
         assertTrue(builder.earliestResetTopicsPattern().matcher(earliestTopic).matches());
         assertTrue(builder.latestResetTopicsPattern().matcher(latestTopic).matches());
@@ -79,8 +80,8 @@ public class InternalTopologyBuilderTest {
         final String earliestTopicPattern = "earliest.*Topic";
         final String latestTopicPattern = "latest.*Topic";
 
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, null, null, Pattern.compile(earliestTopicPattern));
-        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", null, null, null,  Pattern.compile(latestTopicPattern));
+        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, null, null, Pattern.compile(earliestTopicPattern));
+        builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, null, null,  Pattern.compile(latestTopicPattern));
 
         assertTrue(builder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches());
         assertTrue(builder.latestResetTopicsPattern().matcher("latestTestTopic").matches());
@@ -108,18 +109,18 @@ public class InternalTopologyBuilderTest {
         assertEquals(builder.latestResetTopicsPattern().pattern(), "");
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotAllowOffsetResetSourceWithoutTopics() {
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
+        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
     }
 
     @Test
     public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
+        builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
         try {
-            builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
-            fail("Should throw TopologyBuilderException for duplicate source name");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            builder.addSource(Topology.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
+            fail("Should throw TopologyException for duplicate source name");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -127,8 +128,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source", null, null, null, "topic-1");
         try {
             builder.addSource(null, "source", null, null, null, "topic-2");
-            fail("Should throw TopologyBuilderException with source name conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with source name conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -136,8 +137,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source", null, null, null, "topic-1");
         try {
             builder.addSource(null, "source-2", null, null, null, "topic-1");
-            fail("Should throw TopologyBuilderException with topic conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with topic conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -146,16 +147,16 @@ public class InternalTopologyBuilderTest {
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
         try {
             builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-            fail("Should throw TopologyBuilderException with processor name conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with processor name conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void testAddProcessorWithWrongParent() {
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void testAddProcessorWithSelfParent() {
         builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
     }
@@ -166,16 +167,16 @@ public class InternalTopologyBuilderTest {
         builder.addSink("sink", "topic-2", null, null, null, "source");
         try {
             builder.addSink("sink", "topic-3", null, null, null, "source");
-            fail("Should throw TopologyBuilderException with sink name conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with sink name conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void testAddSinkWithWrongParent() {
         builder.addSink("sink", "topic-2", null, null, null, "source");
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void testAddSinkWithSelfParent() {
         builder.addSink("sink", "topic-2", null, null, null, "sink");
     }
@@ -247,8 +248,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source-1", null, null, null, "foo");
         try {
             builder.addSource(null, "source-2", null, null, null, Pattern.compile("f.*"));
-            fail("Should throw TopologyBuilderException with topic name/pattern conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with topic name/pattern conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -256,11 +257,11 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source-1", null, null, null, Pattern.compile("f.*"));
         try {
             builder.addSource(null, "source-2", null, null, null, "foo");
-            fail("Should throw TopologyBuilderException with topic name/pattern conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with topic name/pattern conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void testAddStateStoreWithNonExistingProcessor() {
         builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
     }
@@ -270,8 +271,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         try {
             builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
-            fail("Should throw TopologyBuilderException with store cannot be added to source");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with store cannot be added to source");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -279,8 +280,8 @@ public class InternalTopologyBuilderTest {
         builder.addSink("sink-1", "topic-1", null, null, null);
         try {
             builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
-            fail("Should throw TopologyBuilderException with store cannot be added to sink");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with store cannot be added to sink");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -288,8 +289,8 @@ public class InternalTopologyBuilderTest {
         builder.addStateStore(new MockStateStoreSupplier("store", false));
         try {
             builder.addStateStore(new MockStateStoreSupplier("store", false));
-            fail("Should throw TopologyBuilderException with store name conflict");
-        } catch (final TopologyBuilderException expected) { /* ok */ }
+            fail("Should throw TopologyException with store name conflict");
+        } catch (final TopologyException expected) { /* ok */ }
     }
 
     @Test
@@ -326,12 +327,12 @@ public class InternalTopologyBuilderTest {
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
 
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
 
-        final Map<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
-        expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
-        expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
+        expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -363,13 +364,13 @@ public class InternalTopologyBuilderTest {
         builder.addStateStore(supplier);
         builder.connectProcessorAndStateStores("processor-5", "store-3");
 
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
 
-        final Map<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
         final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1");
         final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
         final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
-        expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(
+        expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(
             Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
             Collections.<String, InternalTopicConfig>emptyMap(),
             Collections.singletonMap(
@@ -378,7 +379,7 @@ public class InternalTopologyBuilderTest {
                     store1,
                     Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
                     Collections.<String, String>emptyMap()))));
-        expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(
+        expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(
             Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
             Collections.<String, InternalTopicConfig>emptyMap(),
             Collections.singletonMap(
@@ -387,7 +388,7 @@ public class InternalTopologyBuilderTest {
                     store2,
                     Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
                     Collections.<String, String>emptyMap()))));
-        expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(
+        expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(
             Collections.<String>emptySet(), mkSet("topic-5"),
             Collections.<String, InternalTopicConfig>emptyMap(),
             Collections.singletonMap(store3,
@@ -519,8 +520,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
         builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), false), "processor");
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
-        final TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
         final Properties properties = topicConfig.toProperties(0);
         final List<String> policies = Arrays.asList(properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP).split(","));
@@ -539,8 +540,8 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
         builder.addStateStore(new MockStateStoreSupplier("name", true), "processor");
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
-        final TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-name-changelog");
         final Properties properties = topicConfig.toProperties(0);
         assertEquals("appId-name-changelog", topicConfig.name());
@@ -554,7 +555,7 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId("appId");
         builder.addInternalTopic("foo");
         builder.addSource(null, "source", null, null, null, "foo");
-        final TopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
+        final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
         final Properties properties = topicConfig.toProperties(0);
         assertEquals("appId-foo", topicConfig.name());
@@ -562,8 +563,9 @@ public class InternalTopologyBuilderTest {
         assertEquals(1, properties.size());
     }
 
-    @Test(expected = TopologyBuilderException.class)
-    public void shouldThroughOnUnassignedStateStoreAccess() {
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldThrowOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
         final String badNodeName = "badGuy";
@@ -573,24 +575,22 @@ public class InternalTopologyBuilderTest {
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         final StreamsConfig streamsConfig = new StreamsConfig(config);
 
+        builder.addSource(null, sourceNodeName, null, null, null, "topic");
+        builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+        builder.addStateStore(
+            Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+            goodNodeName);
+        builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+
         try {
-            builder.addSource(null, sourceNodeName, null, null, null, "topic");
-            builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-            builder.addStateStore(
-                    Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
-                    goodNodeName);
-            builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-
-            final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder);
-            driver.process("topic", null, null);
-        } catch (final StreamsException e) {
-            final Throwable cause = e.getCause();
-            if (cause != null
-                && cause instanceof TopologyBuilderException
-                && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
-                throw (TopologyBuilderException) cause;
-            } else {
-                throw new RuntimeException("Did expect different exception. Did catch:", e);
+            new ProcessorTopologyTestDriver(streamsConfig, builder);
+            fail("Should have throw StreamsException");
+        } catch (final StreamsException expected) {
+            final Throwable cause = expected.getCause();
+            if (cause == null
+                || !(cause instanceof TopologyBuilderException)
+                || !cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
+                throw new RuntimeException("Did expect different exception. Did catch:", expected);
             }
         }
     }
@@ -639,7 +639,7 @@ public class InternalTopologyBuilderTest {
         builder.updateSubscriptions(subscriptionUpdates, null);
         builder.setApplicationId("test-id");
 
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
         assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
         assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
@@ -693,7 +693,7 @@ public class InternalTopologyBuilderTest {
         assertFalse(topics.contains("topic-A"));
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
         final String sameNameForSourceAndProcessor = "sameName";
         builder.addGlobalStore(

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index f173a65..33e43e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
@@ -526,18 +525,18 @@ public class StreamPartitionAssignorTest {
         assertEquals(new HashSet<>(tasks), allTasks);
 
         // check tasks for state topics
-        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
+        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
 
         assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1", tasks, topicGroups));
         assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2", tasks, topicGroups));
         assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3", tasks, topicGroups));
     }
 
-    private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, TopologyBuilder.TopicsInfo> topicGroups) {
+    private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups) {
         final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
 
         Set<TaskId> ids = new HashSet<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
             Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
 
             if (stateChangelogTopics.contains(changelogTopic)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index ce25e67..55f8728 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -178,7 +178,7 @@ public class ProcessorTopologyTestDriver {
         };
 
         // Identify internal topics for forwarding in process ...
-        for (final TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
+        for (final InternalTopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
             internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
         }
 


[3/3] kafka git commit: KAFKA-5670: (KIP-120) Add Topology and deprecate TopologyBuilder

Posted by gu...@apache.org.
KAFKA-5670: (KIP-120) Add Topology and deprecate TopologyBuilder

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3590 from mjsax/kafka-3856-replace-topology-builder-by-topology


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1844bf2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1844bf2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1844bf2b

Branch: refs/heads/trunk
Commit: 1844bf2b2f4cdf5a8209d7ceccb6701fc7dcf768
Parents: c50c941
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Jul 28 16:46:34 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jul 28 16:46:34 2017 -0700

----------------------------------------------------------------------
 docs/streams/developer-guide.html               |  63 +-
 .../wordcount/WordCountProcessorDemo.java       |   6 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |  71 +-
 .../java/org/apache/kafka/streams/Topology.java | 640 ++++++++++++++++++
 .../kafka/streams/TopologyDescription.java      |   7 +-
 .../errors/TopologyBuilderException.java        |   5 +-
 .../kafka/streams/errors/TopologyException.java |  40 ++
 .../apache/kafka/streams/kstream/KStream.java   |   4 +-
 .../kstream/internals/KStreamTransform.java     |   1 +
 .../internals/KStreamTransformValues.java       |   2 +
 .../streams/kstream/internals/KTableImpl.java   |  10 +-
 .../streams/processor/AbstractProcessor.java    |   1 +
 .../streams/processor/TopologyBuilder.java      | 221 +++---
 .../internals/GlobalProcessorContextImpl.java   |   5 +-
 .../internals/InternalTopologyBuilder.java      | 242 ++++---
 .../internals/ProcessorContextImpl.java         |   1 +
 .../internals/SourceNodeRecordDeserializer.java |   1 +
 .../internals/StreamPartitionAssignor.java      |  15 +-
 .../org/apache/kafka/streams/TopologyTest.java  | 671 +++++++++++++++++++
 ...eamsFineGrainedAutoResetIntegrationTest.java |  19 +-
 .../streams/processor/TopologyBuilderTest.java  |   2 +-
 .../kafka/streams/processor/TopologyTest.java   | 408 -----------
 .../internals/InternalTopologyBuilderTest.java  | 140 ++--
 .../internals/StreamPartitionAssignorTest.java  |   7 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   2 +-
 25 files changed, 1865 insertions(+), 719 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 542003d..76c2dc7 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -123,14 +123,14 @@
     <h4><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h4>
 
     <p>
-        With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology
+        With the customized processors defined in the Processor API, developers can use <code>Topology</code> to build a processor topology
         by connecting these processors together:
     </p>
 
     <pre class="brush: java;">
-    TopologyBuilder builder = new TopologyBuilder();
+    Topology topology = new Topology();
 
-    builder.addSource("SOURCE", "src-topic")
+    topology.addSource("SOURCE", "src-topic")
     // add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
     .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
 
@@ -180,15 +180,15 @@
     </pre>
 
     <p>
-        To take advantage of these state stores, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the
+        To take advantage of these state stores, developers can use the <code>Topology.addStateStore</code> method when building the
         processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
-        state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
+        state store with the existing processor nodes through <code>Topology.connectProcessorAndStateStores</code>.
     </p>
 
     <pre class="brush: java;">
-    TopologyBuilder builder = new TopologyBuilder();
+    Topology topology = new Topology();
 
-    builder.addSource("SOURCE", "src-topic")
+    topology.addSource("SOURCE", "src-topic")
 
     .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
     // add the created state store "COUNTS" associated with processor "PROCESS1"
@@ -204,12 +204,32 @@
     .addSink("SINK3", "sink-topic3", "PROCESS3");
     </pre>
 
+    <h4><a id="streams_processor_describe" href="#streams_processor_describe">Describe a <code>Topology</code></a></h4>
+
+    <p>
+        After a <code>Topology</code> is specified it is possible to retrieve a description of the corresponding DAG via <code>#describe()</code> that returns a <code>TopologyDescription</code>.
+        A <code>TopologyDescription</code> contains all added source, processor, and sink nodes as well as all attached stores.
+        For source and sink nodes one can access the specified input/output topic name/pattern.
+        For processor nodes the attached stores are added to the description.
+        Additionally, all nodes have a list to all their connected successor and predecessor nodes.
+        Thus, <code>TopologyDescritpion</code> allows to retrieve the DAG structure of the specified topology.
+        <br />
+        Note that global stores are listed explicitly as they are accessible by all nodes without the need to explicitly connect them.
+        Furthermore, nodes are grouped by <code>Subtopology</code>.
+        Subtopologies are groups of nodes that are directly connected to each other (i.e., either by a direct connection&mdash;but not a topic&mdash;or by sharing a store).
+        For execution, each <code>Subtopology</code> is executed by <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one or multiple tasks</a>.
+        Thus, each <code>Subtopology</code> describes an independent unit of works that can be executed by different threads in parallel.
+        <br />
+        Describing a <code>Topology</code> is helpful to reason about tasks and thus maximum parallelism.
+        It is also helpful to get insight into a <code>Topology</code> if it is not specified manually but via Kafka Streams DSL that is described in the next section.
+    </p>
+
     In the next section we present another way to build the processor topology: the Kafka Streams DSL.
     <br>
 
     <h3><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h3>
 
-    To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code> class, which is extended from the <code>TopologyBuilder</code>.
+    To build a <code>Topology</code> using the Streams DSL, developers can apply the <code>StreamsBuilder</code> class.
     A simple example is included with the source code for Kafka in the <code>streams/examples</code> package. The rest of this section will walk
     through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source
     codes for details.
@@ -755,19 +775,19 @@
 
     <pre class="brush: java;">
           StreamsConfig config = ...;
-          TopologyBuilder builder = ...;
+          Topology topology = ...;
           ProcessorSupplier processorSuppler = ...;
 
           // Create CustomStoreSupplier for store name the-custom-store
           MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store");
           // Add the source topic
-          builder.addSource("input", "inputTopic");
+          topology.addSource("input", "inputTopic");
           // Add a custom processor that reads from the source topic
-          builder.addProcessor("the-processor", processorSupplier, "input");
+          topology.addProcessor("the-processor", processorSupplier, "input");
           // Connect your custom state store to the custom processor above
-          builder.addStateStore(customStoreSupplier, "the-processor");
+          topology.addStateStore(customStoreSupplier, "the-processor");
 
-          KafkaStreams streams = new KafkaStreams(builder, config);
+          KafkaStreams streams = new KafkaStreams(topology, config);
           streams.start();
 
           // Get access to the custom store
@@ -1053,33 +1073,36 @@
     </p>
 
     <p>
-        First, you must create an instance of <code>KafkaStreams</code>. The first argument of the <code>KafkaStreams</code> constructor takes a topology
-        builder (either <code>KStreamBuilder</code> for the Kafka Streams DSL, or <code>TopologyBuilder</code> for the Processor API)
-        that is used to define a topology; The second argument is an instance of <code>StreamsConfig</code> mentioned above.
+        First, you must create an instance of <code>KafkaStreams</code>.
+        The first argument of the <code>KafkaStreams</code> constructor takes a <code>Topology</code>
+        that is a logical topology description (you can create a <code>Topology</code> either directly or use
+        <code>StreamsBuilder</code> to create one).
+        The second argument is an instance of <code>StreamsConfig</code> mentioned above.
     </p>
 
     <pre class="brush: java;">
     import org.apache.kafka.streams.KafkaStreams;
     import org.apache.kafka.streams.StreamsConfig;
     import org.apache.kafka.streams.kstream.KStreamBuilder;
-    import org.apache.kafka.streams.processor.TopologyBuilder;
+    import org.apache.kafka.streams.Topology;
 
     // Use the builders to define the actual processing topology, e.g. to specify
     // from which input topics to read, which stream operations (filter, map, etc.)
     // should be called, and so on.
 
-    KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
+    Topology topology = ...; // when using the Processor API
     //
     // OR
     //
-    TopologyBuilder builder = ...; // when using the Processor API
+    KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
+    Topology topology = builder.topology();
 
     // Use the configuration to tell your application where the Kafka cluster is,
     // which serializers/deserializers to use by default, to specify security settings,
     // and so on.
     StreamsConfig config = ...;
 
-    KafkaStreams streams = new KafkaStreams(builder, config);
+    KafkaStreams streams = new KafkaStreams(topology, config);
     </pre>
 
     <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 0ff42a7..34bb8bb 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -19,15 +19,15 @@ package org.apache.kafka.streams.examples.wordcount;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -119,7 +119,7 @@ public class WordCountProcessorDemo {
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        TopologyBuilder builder = new TopologyBuilder();
+        Topology builder = new Topology();
 
         builder.addSource("Source", "streams-wordcount-input");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 0d88efb..d7c608a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -121,7 +122,7 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG
  * }</pre>
  *
  * @see KStreamBuilder
- * @see TopologyBuilder
+ * @see Topology
  */
 @InterfaceStability.Evolving
 public class KafkaStreams {
@@ -402,36 +403,72 @@ public class KafkaStreams {
     }
 
     /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final TopologyBuilder builder,
+                        final Properties props) {
+        this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final TopologyBuilder builder,
+                        final StreamsConfig config) {
+        this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig, KafkaClientSupplier)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final TopologyBuilder builder,
+                        final StreamsConfig config,
+                        final KafkaClientSupplier clientSupplier) {
+        this(builder.internalTopologyBuilder, config, clientSupplier);
+    }
+
+    /**
      * Create a {@code KafkaStreams} instance.
      *
-     * @param builder the processor topology builder specifying the computational logic
+     * @param topology the topology specifying the computational logic
      * @param props   properties for {@link StreamsConfig}
      */
-    public KafkaStreams(final TopologyBuilder builder, final Properties props) {
-        this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+    public KafkaStreams(final Topology topology,
+                        final Properties props) {
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
     }
 
     /**
      * Create a {@code KafkaStreams} instance.
      *
-     * @param builder the processor topology builder specifying the computational logic
+     * @param topology the topology specifying the computational logic
      * @param config  the Kafka Streams configuration
      */
-    public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config) {
-        this(builder, config, new DefaultKafkaClientSupplier());
+    public KafkaStreams(final Topology topology,
+                        final StreamsConfig config) {
+        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
     }
 
     /**
      * Create a {@code KafkaStreams} instance.
      *
-     * @param builder        the processor topology builder specifying the computational logic
+     * @param topology       the topology specifying the computational logic
      * @param config         the Kafka Streams configuration
      * @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients
      *                       for the new {@code KafkaStreams} instance
      */
-    public KafkaStreams(final TopologyBuilder builder,
+    public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier) {
+        this(topology.internalTopologyBuilder, config, clientSupplier);
+    }
+
+    private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
+                         final StreamsConfig config,
+                         final KafkaClientSupplier clientSupplier) {
         // create the metrics
         final Time time = Time.SYSTEM;
 
@@ -442,7 +479,7 @@ public class KafkaStreams {
         // The application ID is a required config and hence should always have value
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 
-        builder.setApplicationId(applicationId);
+        internalTopologyBuilder.setApplicationId(applicationId);
 
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
@@ -466,16 +503,16 @@ public class KafkaStreams {
         GlobalStreamThread.State globalThreadState = null;
 
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
-        streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+        streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
-        final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology();
+        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
 
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
             log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix);
         }
 
         final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
-                (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
+            (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
 
         stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
         if (globalTaskTopology != null) {
@@ -491,7 +528,7 @@ public class KafkaStreams {
         }
 
         for (int i = 0; i < threads.length; i++) {
-            threads[i] = new StreamThread(builder.internalTopologyBuilder,
+            threads[i] = new StreamThread(internalTopologyBuilder,
                                           config,
                                           clientSupplier,
                                           applicationId,
@@ -509,11 +546,11 @@ public class KafkaStreams {
         if (globalTaskTopology != null) {
             globalStreamThread.setStateListener(streamStateListener);
         }
-        for (int i = 0; i < threads.length; i++) {
-            threads[i].setStateListener(streamStateListener);
+        for (StreamThread thread : threads) {
+            thread.setStateListener(streamStateListener);
         }
 
-        final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
+        final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
         final String cleanupThreadName = clientId + "-CleanupThread";
         stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
new file mode 100644
index 0000000..ca0ac75
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.SinkNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.regex.Pattern;
+
+/**
+ * A logical representation of a {@link ProcessorTopology}.
+ * A topology is an acyclic graph of sources, processors, and sinks.
+ * A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to its
+ * successor nodes.
+ * A {@link Processor processor} is a node in the graph that receives input records from upstream nodes, processes the
+ * records, and optionally forwarding new records to one or all of its downstream nodes.
+ * Finally, a {@link SinkNode sink} is a node in the graph that receives records from upstream nodes and writes them to
+ * a Kafka topic.
+ * A {@code Topology} allows you to construct an acyclic graph of these nodes, and then passed into a new
+ * {@link KafkaStreams} instance that will then {@link KafkaStreams#start() begin consuming, processing, and producing
+ * records}.
+ */
+public class Topology {
+
+    final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
+
+    /**
+     * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}
+     */
+    public enum AutoOffsetReset {
+        EARLIEST, LATEST
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topics the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final String name,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final String name,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topics the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final TimestampExtractor timestampExtractor,
+                                           final String name,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final TimestampExtractor timestampExtractor,
+                                           final String name,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this source if no committed offsets found;
+     *                           acceptable values earliest or latest
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final TimestampExtractor timestampExtractor,
+                                           final String name,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern and forward the records to child processor
+     * and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param offsetReset        the auto offset reset policy value for this source if no committed offsets found;
+     *                           acceptable values earliest or latest.
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final TimestampExtractor timestampExtractor,
+                                           final String name,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final String name,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by name
+     */
+    public synchronized Topology addSource(final String name,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by name
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by name
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest.
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final TimestampExtractor timestampExtractor,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by name
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final TimestampExtractor timestampExtractor,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+     * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * and write to its topic
+     * @return itself
+     * @throws TopologyException itself
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     */
+    public synchronized Topology addSink(final String name,
+                                         final String topic,
+                                         final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topic, null, null, null, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic,
+     * using the supplied partitioner.
+     * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * <p>
+     * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
+     * the named Kafka topic's partitions.
+     * Such control is often useful with topologies that use {@link #addStateStore(StateStoreSupplier, String...) state
+     * stores} in its processors.
+     * In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
+     * records among partitions using Kafka's default partitioning logic.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param partitioner the function that should be used to determine the partition for each record processed by the sink
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * and write to its topic
+     * @return itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     */
+    public synchronized Topology addSink(final String name,
+                                         final String topic,
+                                         final StreamPartitioner partitioner,
+                                         final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topic, null, null, partitioner, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+     * The sink will use the specified key and value serializers.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
+     * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
+     * {@link StreamsConfig stream configuration}
+     * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
+     * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamsConfig stream configuration}
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * and write to its topic
+     * @return itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     */
+    public synchronized Topology addSink(final String name,
+                                         final String topic,
+                                         final Serializer keySerializer,
+                                         final Serializer valueSerializer,
+                                         final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topic, keySerializer, valueSerializer, null, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+     * The sink will use the specified key and value serializers, and the supplied partitioner.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
+     * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
+     * {@link StreamsConfig stream configuration}
+     * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
+     * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamsConfig stream configuration}
+     * @param partitioner the function that should be used to determine the partition for each record processed by the sink
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * and write to its topic
+     * @return itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     */
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final String topic,
+                                                final Serializer<K> keySerializer,
+                                                final Serializer<V> valueSerializer,
+                                                final StreamPartitioner<? super K, ? super V> partitioner,
+                                                final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topic, keySerializer, valueSerializer, partitioner, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new processor node that receives and processes records output by one or more parent source or processor
+     * node.
+     * Any new record output by this processor will be forwarded to its child processor or sink nodes.
+     *
+     * @param name the unique name of the processor node
+     * @param supplier the supplier used to obtain this node's {@link Processor} instance
+     * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
+     * and process
+     * @return itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     */
+    public synchronized Topology addProcessor(final String name,
+                                              final ProcessorSupplier supplier,
+                                              final String... parentNames) {
+        internalTopologyBuilder.addProcessor(name, supplier, parentNames);
+        return this;
+    }
+
+    /**
+     * Adds a state store.
+     *
+     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+     * @return itself
+     * @throws TopologyException if state store supplier is already added
+     */
+    public synchronized Topology addStateStore(final StateStoreSupplier supplier,
+                                               final String... processorNames) {
+        internalTopologyBuilder.addStateStore(supplier, processorNames);
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param storeSupplier         user defined state store supplier
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                                final String sourceName,
+                                                final Deserializer keyDeserializer,
+                                                final Deserializer valueDeserializer,
+                                                final String topic,
+                                                final String processorName,
+                                                final ProcessorSupplier stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
+            valueDeserializer, topic, processorName, stateUpdateSupplier);
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeSupplier         user defined state store supplier
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                                final String sourceName,
+                                                final TimestampExtractor timestampExtractor,
+                                                final Deserializer keyDeserializer,
+                                                final Deserializer valueDeserializer,
+                                                final String topic,
+                                                final String processorName,
+                                                final ProcessorSupplier stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
+            valueDeserializer, topic, processorName, stateUpdateSupplier);
+        return this;
+    }
+
+    /**
+     * Connects the processor and the state stores.
+     *
+     * @param processorName the name of the processor
+     * @param stateStoreNames the names of state stores that the processor uses
+     * @return itself
+     * @throws TopologyException if the processor or a state store is unknown
+     */
+    public synchronized Topology connectProcessorAndStateStores(final String processorName,
+                                                                final String... stateStoreNames) {
+        internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+        return this;
+    }
+
+    public synchronized TopologyDescription describe() {
+        return internalTopologyBuilder.describe();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index dd481ff..725b7b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -35,8 +35,9 @@ public interface TopologyDescription {
     /**
      * A connected sub-graph of a {@link Topology}.
      * <p>
-     * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String, ProcessorSupplier, String...)
-     * directly} or indirectly via {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
+     * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String,
+     * org.apache.kafka.streams.processor.ProcessorSupplier, String...) directly} or indirectly via
+     * {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
      * (i.e., if multiple processors share the same state).
      */
     interface Subtopology {
@@ -54,7 +55,7 @@ public interface TopologyDescription {
     }
 
     /**
-     * Represents a {@link Topology#addGlobalStore(StateStoreSupplier, String,
+     * Represents a {@link Topology#addGlobalStore(org.apache.kafka.streams.processor.StateStoreSupplier, String,
      * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
      * String, ProcessorSupplier)} global store}.
      * Adding a global store results in adding a source node and one stateful processor node.

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
index b9c0c3a..385d401 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
@@ -18,9 +18,12 @@ package org.apache.kafka.streams.errors;
 
 
 /**
- * Indicates a pre-run time error incurred while parsing the {@link org.apache.kafka.streams.processor.TopologyBuilder
+ * Indicates a pre-run time error occurred while parsing the {@link org.apache.kafka.streams.processor.TopologyBuilder
  * builder} to construct the {@link org.apache.kafka.streams.processor.internals.ProcessorTopology processor topology}.
+ *
+ * @deprecated use {@link org.apache.kafka.streams.Topology} instead of {@link org.apache.kafka.streams.processor.TopologyBuilder}
  */
+@Deprecated
 public class TopologyBuilderException extends StreamsException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
new file mode 100644
index 0000000..1eaef06
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+
+/**
+ * Indicates a pre run time error occurred while parsing the {@link org.apache.kafka.streams.Topology logical topology}
+ * to construct the {@link org.apache.kafka.streams.processor.internals.ProcessorTopology physical processor topology}.
+ */
+public class TopologyException extends StreamsException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TopologyException(final String message) {
+        super("Invalid topology" + (message == null ? "" : ": " + message));
+    }
+
+    public TopologyException(final String message,
+                             final Throwable throwable) {
+        super("Invalid topology" + (message == null ? "" : ": " + message), throwable);
+    }
+
+    public TopologyException(final Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 191931b..535e1e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -22,13 +22,13 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 
 /**
  * {@code KStream} is an abstraction of a <i>record stream</i> of {@link KeyValue} pairs, i.e., each record is an
@@ -42,7 +42,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
  * <p>
  * A {@code KStream} can be transformed record by record, joined with another {@code KStream}, {@link KTable},
  * {@link GlobalKTable}, or can be aggregated into a {@link KTable}.
- * Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. {@link TopologyBuilder}) via
+ * Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. {@link Topology}) via
  * {@link #process(ProcessorSupplier, String...) process(...)},
  * {@link #transform(TransformerSupplier, String...) transform(...)}, and
  * {@link #transformValues(ValueTransformerSupplier, String...) transformValues(...)}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 93cf410..0afadbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -59,6 +59,7 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
                 context().forward(pair.key, pair.value);
         }
 
+        @SuppressWarnings("deprecation")
         @Override
         public void punctuate(long timestamp) {
             KeyValue<? extends K2, ? extends V2> pair = transformer.punctuate(timestamp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index a6e9aaf..ab1c302 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -105,6 +105,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
                         return context.schedule(interval, type, callback);
                     }
 
+                    @SuppressWarnings("deprecation")
                     @Override
                     public void schedule(final long interval) {
                         context.schedule(interval);
@@ -168,6 +169,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
             context.forward(key, valueTransformer.transform(value));
         }
 
+        @SuppressWarnings("deprecation")
         @Override
         public void punctuate(long timestamp) {
             if (valueTransformer.punctuate(timestamp) != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 679efe5..048670e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -238,22 +238,25 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doMapValues(mapper, valueSerde, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print() {
         print(null, null, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(String label) {
         print(null, null, label);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(Serde<K> keySerde, Serde<V> valSerde) {
         print(keySerde, valSerde, this.name);
     }
 
-
+    @SuppressWarnings("deprecation")
     @Override
     public void print(Serde<K> keySerde, final Serde<V> valSerde, String label) {
         Objects.requireNonNull(label, "label can't be null");
@@ -261,16 +264,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(String filePath) {
         writeAsText(filePath, this.name, null, null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(String filePath, String label) {
         writeAsText(filePath, label, null, null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
         writeAsText(filePath, this.name, keySerde, valSerde);
@@ -279,6 +285,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     /**
      * @throws TopologyBuilderException if file is not found
      */
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(String filePath, String label, Serde<K> keySerde, Serde<V> valSerde) {
         Objects.requireNonNull(filePath, "filePath can't be null");
@@ -296,6 +303,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void foreach(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
index 49b3c18..1cfe78a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -44,6 +44,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
      *
      * @param timestamp the wallclock time when this method is being called
      */
+    @SuppressWarnings("deprecation")
     @Override
     public void punctuate(long timestamp) {
         // do nothing


[2/3] kafka git commit: KAFKA-5670: (KIP-120) Add Topology and deprecate TopologyBuilder

Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ce6ba7b..e6c0d6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -16,11 +16,12 @@
  */
 package org.apache.kafka.streams.processor;
 
-import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
@@ -33,6 +34,7 @@ import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.Subs
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,24 +48,29 @@ import java.util.regex.Pattern;
  * is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you
  * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
  * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}.
+ *
+ * @deprecated use {@link Topology} instead
  */
-@InterfaceStability.Evolving
+@Deprecated
 public class TopologyBuilder {
 
     /**
      * NOTE this member would not needed by developers working with the processor APIs, but only used
      * for internal functionalities.
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
 
+    private Topology.AutoOffsetReset translateAutoOffsetReset(final TopologyBuilder.AutoOffsetReset resetPolicy) {
+        if (resetPolicy == null) {
+            return null;
+        }
+        return resetPolicy == TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
+    }
+
     /**
      * NOTE this class would not needed by developers working with the processor APIs, but only used
      * for internal functionalities.
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public static class TopicsInfo {
         public Set<String> sinkTopics;
         public Set<String> sourceTopics;
@@ -108,7 +115,7 @@ public class TopologyBuilder {
     }
 
     /**
-     * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}
+     * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}.
      */
     public enum AutoOffsetReset {
         EARLIEST, LATEST
@@ -119,8 +126,7 @@ public class TopologyBuilder {
      */
     public TopologyBuilder() {}
 
-    /** @deprecated This class is not part of public API and should never be used by a developer. */
-    @Deprecated
+    /** This class is not part of public API and should never be used by a developer. */
     public synchronized final TopologyBuilder setApplicationId(final String applicationId) {
         internalTopologyBuilder.setApplicationId(applicationId);
         return this;
@@ -140,7 +146,11 @@ public class TopologyBuilder {
      */
     public synchronized final TopologyBuilder addSource(final String name,
                                                         final String... topics) {
-        internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+        try {
+            internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -160,7 +170,11 @@ public class TopologyBuilder {
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
                                                         final String... topics) {
-        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -178,8 +192,13 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
-                                                        final String name, final String... topics) {
-        internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+                                                        final String name,
+                                                        final String... topics) {
+        try {
+            internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -199,8 +218,14 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
-                                                        final TimestampExtractor timestampExtractor, final String name, final String... topics) {
-        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics);
+                                                        final TimestampExtractor timestampExtractor,
+                                                        final String name,
+                                                        final String... topics) {
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -219,7 +244,11 @@ public class TopologyBuilder {
      */
     public synchronized final TopologyBuilder addSource(final String name,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -240,7 +269,11 @@ public class TopologyBuilder {
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -262,11 +295,14 @@ public class TopologyBuilder {
     public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
                                                         final String name,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
-
     /**
      * Add a new source that consumes from topics matching the given pattern
      * and forward the records to child processor and/or sink nodes.
@@ -287,11 +323,14 @@ public class TopologyBuilder {
                                                         final TimestampExtractor timestampExtractor,
                                                         final String name,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
-
     /**
      * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
      * The source will use the specified key and value deserializers.
@@ -307,12 +346,15 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
-
     public synchronized final TopologyBuilder addSource(final String name,
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final String... topics) {
-        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+        try {
+            internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -334,14 +376,17 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
-
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
                                                         final TimestampExtractor timestampExtractor,
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final String... topics) {
-        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topics);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -374,7 +419,11 @@ public class TopologyBuilder {
                                                        final String topic,
                                                        final String processorName,
                                                        final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        try {
+            internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -409,7 +458,11 @@ public class TopologyBuilder {
                                                        final String topic,
                                                        final String processorName,
                                                        final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        try {
+            internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -435,7 +488,11 @@ public class TopologyBuilder {
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -466,7 +523,11 @@ public class TopologyBuilder {
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -494,11 +555,14 @@ public class TopologyBuilder {
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, keyDeserializer, valDeserializer, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
-
     /**
      * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
      * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
@@ -517,7 +581,11 @@ public class TopologyBuilder {
     public synchronized final TopologyBuilder addSink(final String name,
                                                       final String topic,
                                                       final String... predecessorNames) {
-        internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
+        try {
+            internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -548,7 +616,11 @@ public class TopologyBuilder {
                                                       final String topic,
                                                       final StreamPartitioner partitioner,
                                                       final String... predecessorNames) {
-        internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
+        try {
+            internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -576,7 +648,11 @@ public class TopologyBuilder {
                                                       final Serializer keySerializer,
                                                       final Serializer valSerializer,
                                                       final String... predecessorNames) {
-        internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
+        try {
+            internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -607,7 +683,11 @@ public class TopologyBuilder {
                                                              final Serializer<V> valSerializer,
                                                              final StreamPartitioner<? super K, ? super V> partitioner,
                                                              final String... predecessorNames) {
-        internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
+        try {
+            internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -624,7 +704,11 @@ public class TopologyBuilder {
     public synchronized final TopologyBuilder addProcessor(final String name,
                                                            final ProcessorSupplier supplier,
                                                            final String... predecessorNames) {
-        internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
+        try {
+            internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -637,7 +721,11 @@ public class TopologyBuilder {
      */
     public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
                                                             final String... processorNames) {
-        internalTopologyBuilder.addStateStore(supplier, processorNames);
+        try {
+            internalTopologyBuilder.addStateStore(supplier, processorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -650,7 +738,13 @@ public class TopologyBuilder {
      */
     public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName,
                                                                              final String... stateStoreNames) {
-        internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+        if (stateStoreNames != null && stateStoreNames.length > 0) {
+            try {
+                internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+            } catch (final TopologyException e) {
+                throw new TopologyBuilderException(e);
+            }
+        }
         return this;
     }
 
@@ -660,10 +754,7 @@ public class TopologyBuilder {
      *
      * NOTE this function would not needed by developers working with the processor APIs, but only used
      * for the high-level DSL parsing functionalities.
-     *
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName,
                                                                             final String topic) {
         internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, topic);
@@ -679,9 +770,7 @@ public class TopologyBuilder {
      * @param processorNames the name of the processors
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
         internalTopologyBuilder.connectProcessors(processorNames);
         return this;
@@ -695,9 +784,7 @@ public class TopologyBuilder {
      *
      * @param topicName the name of the topic
      * @return this builder instance so methods can be chained together; never null
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
         internalTopologyBuilder.addInternalTopic(topicName);
         return this;
@@ -711,9 +798,7 @@ public class TopologyBuilder {
      *
      * @param sourceNodes a set of source node names
      * @return this builder instance so methods can be chained together; never null
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized final TopologyBuilder copartitionSources(final Collection<String> sourceNodes) {
         internalTopologyBuilder.copartitionSources(sourceNodes);
         return this;
@@ -726,9 +811,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return groups of node names
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Map<Integer, Set<String>> nodeGroups() {
         return internalTopologyBuilder.nodeGroups();
     }
@@ -741,9 +824,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized ProcessorTopology build(final Integer topicGroupId) {
         return internalTopologyBuilder.build(topicGroupId);
     }
@@ -755,9 +836,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return ProcessorTopology
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized ProcessorTopology buildGlobalStateTopology() {
         return internalTopologyBuilder.buildGlobalStateTopology();
     }
@@ -770,9 +849,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return map containing all global {@link StateStore}s
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public Map<String, StateStore> globalStateStores() {
         return internalTopologyBuilder.globalStateStores();
     }
@@ -785,11 +862,22 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return groups of topic names
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Map<Integer, TopicsInfo> topicGroups() {
-        return internalTopologyBuilder.topicGroups();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroupsWithNewTopicsInfo = internalTopologyBuilder.topicGroups();
+        final Map<Integer, TopicsInfo> topicGroupsWithDeprecatedTopicInfo = new HashMap<>();
+
+        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroupsWithNewTopicsInfo.entrySet()) {
+            final InternalTopologyBuilder.TopicsInfo newTopicsInfo = entry.getValue();
+
+            topicGroupsWithDeprecatedTopicInfo.put(entry.getKey(), new TopicsInfo(
+                newTopicsInfo.sinkTopics,
+                newTopicsInfo.sourceTopics,
+                newTopicsInfo.repartitionSourceTopics,
+                newTopicsInfo.stateChangelogTopics));
+        }
+
+        return topicGroupsWithDeprecatedTopicInfo;
     }
 
     /**
@@ -799,9 +887,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return the Pattern for matching all topics reading from earliest offset, never null
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Pattern earliestResetTopicsPattern() {
         return internalTopologyBuilder.earliestResetTopicsPattern();
     }
@@ -813,9 +899,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return the Pattern for matching all topics reading from latest offset, never null
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Pattern latestResetTopicsPattern() {
         return internalTopologyBuilder.latestResetTopicsPattern();
     }
@@ -825,9 +909,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return a mapping from state store name to a Set of source Topics.
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public Map<String, List<String>> stateStoreNameToSourceTopics() {
         return internalTopologyBuilder.stateStoreNameToSourceTopics();
     }
@@ -840,9 +922,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return groups of topic names
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Collection<Set<String>> copartitionGroups() {
         return internalTopologyBuilder.copartitionGroups();
     }
@@ -850,10 +930,7 @@ public class TopologyBuilder {
     /**
      * NOTE this function would not needed by developers working with the processor APIs, but only used
      * for the high-level DSL parsing functionalities.
-     *
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public SubscriptionUpdates subscriptionUpdates() {
         return internalTopologyBuilder.subscriptionUpdates();
     }
@@ -861,10 +938,7 @@ public class TopologyBuilder {
     /**
      * NOTE this function would not needed by developers working with the processor APIs, but only used
      * for the high-level DSL parsing functionalities.
-     *
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Pattern sourceTopicPattern() {
         return internalTopologyBuilder.sourceTopicPattern();
     }
@@ -872,10 +946,7 @@ public class TopologyBuilder {
     /**
      * NOTE this function would not needed by developers working with the processor APIs, but only used
      * for the high-level DSL parsing functionalities.
-     *
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
                                                  final String threadId) {
         internalTopologyBuilder.updateSubscriptions(subscriptionUpdates, threadId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 4c1d350..7925b14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -38,9 +37,6 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
         super(new TaskId(-1, -1), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), config, metrics, stateMgr, cache);
     }
 
-    /**
-     * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
-     */
     @Override
     public StateStore getStateStore(final String name) {
         return stateManager.getGlobalStore(name);
@@ -95,6 +91,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     /**
      * @throws UnsupportedOperationException on every invocation
      */
+    @SuppressWarnings("deprecation")
     @Override
     public void schedule(long interval) {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index ff65d31..0d5cd48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -19,13 +19,13 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
@@ -207,7 +207,7 @@ public class InternalTopologyBuilder {
                 } else if (topicToPatterns.containsKey(update) && isMatch(update)) {
                     // the same topic cannot be matched to more than one pattern
                     // TODO: we should lift this requirement in the future
-                    throw new TopologyBuilderException("Topic " + update +
+                    throw new TopologyException("Topic " + update +
                         " is already matched for another regex pattern " + topicToPatterns.get(update) +
                         " and hence cannot be matched to this regex pattern " + pattern + " any more.");
                 } else if (isMatch(update)) {
@@ -293,18 +293,18 @@ public class InternalTopologyBuilder {
         return this;
     }
 
-    public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+    public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
                                 final Deserializer keyDeserializer,
                                 final Deserializer valDeserializer,
                                 final String... topics) {
         if (topics.length == 0) {
-            throw new TopologyBuilderException("You must provide at least one topic");
+            throw new TopologyException("You must provide at least one topic");
         }
         Objects.requireNonNull(name, "name must not be null");
         if (nodeFactories.containsKey(name)) {
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
+            throw new TopologyException("Processor " + name + " is already added.");
         }
 
         for (final String topic : topics) {
@@ -319,67 +319,7 @@ public class InternalTopologyBuilder {
         nodeGrouper.add(name);
     }
 
-    public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                     final String sourceName,
-                                     final TimestampExtractor timestampExtractor,
-                                     final Deserializer keyDeserializer,
-                                     final Deserializer valueDeserializer,
-                                     final String topic,
-                                     final String processorName,
-                                     final ProcessorSupplier stateUpdateSupplier) {
-        Objects.requireNonNull(storeSupplier, "store supplier must not be null");
-        Objects.requireNonNull(sourceName, "sourceName must not be null");
-        Objects.requireNonNull(topic, "topic must not be null");
-        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
-        Objects.requireNonNull(processorName, "processorName must not be null");
-        if (nodeFactories.containsKey(sourceName)) {
-            throw new TopologyBuilderException("Processor " + sourceName + " is already added.");
-        }
-        if (nodeFactories.containsKey(processorName)) {
-            throw new TopologyBuilderException("Processor " + processorName + " is already added.");
-        }
-        if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
-            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
-        }
-        if (storeSupplier.loggingEnabled()) {
-            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
-        }
-        if (sourceName.equals(processorName)) {
-            throw new TopologyBuilderException("sourceName and processorName must be different.");
-        }
-
-        validateTopicNotAlreadyRegistered(topic);
-
-        globalTopics.add(topic);
-        final String[] topics = {topic};
-        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
-        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
-        nodeGrouper.add(sourceName);
-
-        final String[] predecessors = {sourceName};
-        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
-        nodeFactory.addStateStore(storeSupplier.name());
-        nodeFactories.put(processorName, nodeFactory);
-        nodeGrouper.add(processorName);
-        nodeGrouper.unite(processorName, predecessors);
-
-        globalStateStores.put(storeSupplier.name(), storeSupplier.get());
-        connectSourceStoreAndTopic(storeSupplier.name(), topic);
-    }
-
-    private void validateTopicNotAlreadyRegistered(final String topic) {
-        if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
-            throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
-        }
-
-        for (final Pattern pattern : nodeToSourcePatterns.values()) {
-            if (pattern.matcher(topic).matches()) {
-                throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
-            }
-        }
-    }
-
-    public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+    public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
                                 final Deserializer keyDeserializer,
@@ -389,12 +329,12 @@ public class InternalTopologyBuilder {
         Objects.requireNonNull(name, "name can't be null");
 
         if (nodeFactories.containsKey(name)) {
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
+            throw new TopologyException("Processor " + name + " is already added.");
         }
 
         for (final String sourceTopicName : sourceTopicNames) {
             if (topicPattern.matcher(sourceTopicName).matches()) {
-                throw new TopologyBuilderException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
+                throw new TopologyException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
             }
         }
 
@@ -414,15 +354,18 @@ public class InternalTopologyBuilder {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(topic, "topic must not be null");
         if (nodeFactories.containsKey(name)) {
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
+            throw new TopologyException("Processor " + name + " is already added.");
         }
 
         for (final String predecessor : predecessorNames) {
             if (predecessor.equals(name)) {
-                throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
             }
             if (!nodeFactories.containsKey(predecessor)) {
-                throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
+            }
+            if (nodeToSinkTopic.containsKey(predecessor)) {
+                throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
             }
         }
 
@@ -438,15 +381,15 @@ public class InternalTopologyBuilder {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(supplier, "supplier must not be null");
         if (nodeFactories.containsKey(name)) {
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
+            throw new TopologyException("Processor " + name + " is already added.");
         }
 
         for (final String predecessor : predecessorNames) {
             if (predecessor.equals(name)) {
-                throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
             }
             if (!nodeFactories.containsKey(predecessor)) {
-                throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
             }
         }
 
@@ -459,7 +402,7 @@ public class InternalTopologyBuilder {
                                     final String... processorNames) {
         Objects.requireNonNull(supplier, "supplier can't be null");
         if (stateFactories.containsKey(supplier.name())) {
-            throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
+            throw new TopologyException("StateStore " + supplier.name() + " is already added.");
         }
 
         stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
@@ -471,43 +414,109 @@ public class InternalTopologyBuilder {
         }
     }
 
+    public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                     final String sourceName,
+                                     final TimestampExtractor timestampExtractor,
+                                     final Deserializer keyDeserializer,
+                                     final Deserializer valueDeserializer,
+                                     final String topic,
+                                     final String processorName,
+                                     final ProcessorSupplier stateUpdateSupplier) {
+        Objects.requireNonNull(storeSupplier, "store supplier must not be null");
+        Objects.requireNonNull(sourceName, "sourceName must not be null");
+        Objects.requireNonNull(topic, "topic must not be null");
+        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
+        Objects.requireNonNull(processorName, "processorName must not be null");
+        if (nodeFactories.containsKey(sourceName)) {
+            throw new TopologyException("Processor " + sourceName + " is already added.");
+        }
+        if (nodeFactories.containsKey(processorName)) {
+            throw new TopologyException("Processor " + processorName + " is already added.");
+        }
+        if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
+            throw new TopologyException("StateStore " + storeSupplier.name() + " is already added.");
+        }
+        if (storeSupplier.loggingEnabled()) {
+            throw new TopologyException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
+        }
+        if (sourceName.equals(processorName)) {
+            throw new TopologyException("sourceName and processorName must be different.");
+        }
+
+        validateTopicNotAlreadyRegistered(topic);
+
+        globalTopics.add(topic);
+        final String[] topics = {topic};
+        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
+        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
+        nodeGrouper.add(sourceName);
+
+        final String[] predecessors = {sourceName};
+        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
+        nodeFactory.addStateStore(storeSupplier.name());
+        nodeFactories.put(processorName, nodeFactory);
+        nodeGrouper.add(processorName);
+        nodeGrouper.unite(processorName, predecessors);
+
+        globalStateStores.put(storeSupplier.name(), storeSupplier.get());
+        connectSourceStoreAndTopic(storeSupplier.name(), topic);
+    }
+
+    private void validateTopicNotAlreadyRegistered(final String topic) {
+        if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
+            throw new TopologyException("Topic " + topic + " has already been registered by another source.");
+        }
+
+        for (final Pattern pattern : nodeToSourcePatterns.values()) {
+            if (pattern.matcher(topic).matches()) {
+                throw new TopologyException("Topic " + topic + " matches a Pattern already registered by another source.");
+            }
+        }
+    }
+
     public final void connectProcessorAndStateStores(final String processorName,
                                                      final String... stateStoreNames) {
         Objects.requireNonNull(processorName, "processorName can't be null");
-        if (stateStoreNames != null) {
-            for (final String stateStoreName : stateStoreNames) {
-                connectProcessorAndStateStore(processorName, stateStoreName);
-            }
+        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be null");
+        if (stateStoreNames.length == 0) {
+            throw new TopologyException("Must provide at least one state store name.");
+        }
+        for (final String stateStoreName : stateStoreNames) {
+            connectProcessorAndStateStore(processorName, stateStoreName);
         }
     }
 
+    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void connectSourceStoreAndTopic(final String sourceStoreName,
                                                   final String topic) {
         if (storeToChangelogTopic.containsKey(sourceStoreName)) {
-            throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
+            throw new TopologyException("Source store " + sourceStoreName + " is already added.");
         }
         storeToChangelogTopic.put(sourceStoreName, topic);
     }
 
+    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void connectProcessors(final String... processorNames) {
         if (processorNames.length < 2) {
-            throw new TopologyBuilderException("At least two processors need to participate in the connection.");
+            throw new TopologyException("At least two processors need to participate in the connection.");
         }
 
         for (final String processorName : processorNames) {
             if (!nodeFactories.containsKey(processorName)) {
-                throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+                throw new TopologyException("Processor " + processorName + " is not added yet.");
             }
         }
 
         nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length));
     }
 
+    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void addInternalTopic(final String topicName) {
         Objects.requireNonNull(topicName, "topicName can't be null");
         internalTopicNames.add(topicName);
     }
 
+    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
@@ -515,10 +524,10 @@ public class InternalTopologyBuilder {
     private void connectProcessorAndStateStore(final String processorName,
                                                final String stateStoreName) {
         if (!stateFactories.containsKey(stateStoreName)) {
-            throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
+            throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
         }
         if (!nodeFactories.containsKey(processorName)) {
-            throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+            throw new TopologyException("Processor " + processorName + " is not added yet.");
         }
 
         final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
@@ -535,7 +544,7 @@ public class InternalTopologyBuilder {
             processorNodeFactory.addStateStore(stateStoreName);
             connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
         } else {
-            throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+            throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
         }
     }
 
@@ -588,7 +597,7 @@ public class InternalTopologyBuilder {
 
     private <T> void maybeAddToResetList(final Collection<T> earliestResets,
                                          final Collection<T> latestResets,
-                                         final TopologyBuilder.AutoOffsetReset offsetReset,
+                                         final Topology.AutoOffsetReset offsetReset,
                                          final T item) {
         if (offsetReset != null) {
             switch (offsetReset) {
@@ -599,7 +608,7 @@ public class InternalTopologyBuilder {
                     latestResets.add(item);
                     break;
                 default:
-                    throw new TopologyBuilderException(String.format("Unrecognized reset format %s", offsetReset));
+                    throw new TopologyException(String.format("Unrecognized reset format %s", offsetReset));
             }
         }
     }
@@ -759,7 +768,7 @@ public class InternalTopologyBuilder {
                         }
                     }
                 } else {
-                    throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
+                    throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                 }
             }
         }
@@ -782,8 +791,8 @@ public class InternalTopologyBuilder {
      *
      * @return groups of topic names
      */
-    public synchronized Map<Integer, TopologyBuilder.TopicsInfo> topicGroups() {
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = new LinkedHashMap<>();
+    public synchronized Map<Integer, TopicsInfo> topicGroups() {
+        final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
 
         if (nodeGroups == null) {
             nodeGroups = makeNodeGroups();
@@ -839,7 +848,7 @@ public class InternalTopologyBuilder {
                 }
             }
             if (!sourceTopics.isEmpty()) {
-                topicGroups.put(entry.getKey(), new TopologyBuilder.TopicsInfo(
+                topicGroups.put(entry.getKey(), new TopicsInfo(
                         Collections.unmodifiableSet(sinkTopics),
                         Collections.unmodifiableSet(sourceTopics),
                         Collections.unmodifiableMap(internalSourceTopics),
@@ -921,7 +930,7 @@ public class InternalTopologyBuilder {
                                       final Set<String> otherTopics) {
         for (final Pattern otherPattern : otherPatterns) {
             if (builtPattern.pattern().contains(otherPattern.pattern())) {
-                throw new TopologyBuilderException(
+                throw new TopologyException(
                     String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets",
                         otherPattern.pattern(),
                         builtPattern.pattern()));
@@ -930,7 +939,7 @@ public class InternalTopologyBuilder {
 
         for (final String otherTopic : otherTopics) {
             if (builtPattern.matcher(otherTopic).matches()) {
-                throw new TopologyBuilderException(
+                throw new TopologyException(
                     String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets",
                         builtPattern.pattern(),
                         otherTopic));
@@ -995,7 +1004,7 @@ public class InternalTopologyBuilder {
 
     private String decorateTopic(final String topic) {
         if (applicationId == null) {
-            throw new TopologyBuilderException("there are internal topics and "
+            throw new TopologyException("there are internal topics and "
                     + "applicationId hasn't been set. Call "
                     + "setApplicationId first");
         }
@@ -1397,6 +1406,49 @@ public class InternalTopologyBuilder {
         }
     }
 
+    public static class TopicsInfo {
+        public Set<String> sinkTopics;
+        public Set<String> sourceTopics;
+        public Map<String, InternalTopicConfig> stateChangelogTopics;
+        public Map<String, InternalTopicConfig> repartitionSourceTopics;
+
+        TopicsInfo(final Set<String> sinkTopics,
+                          final Set<String> sourceTopics,
+                          final Map<String, InternalTopicConfig> repartitionSourceTopics,
+                          final Map<String, InternalTopicConfig> stateChangelogTopics) {
+            this.sinkTopics = sinkTopics;
+            this.sourceTopics = sourceTopics;
+            this.stateChangelogTopics = stateChangelogTopics;
+            this.repartitionSourceTopics = repartitionSourceTopics;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (o instanceof TopicsInfo) {
+                final TopicsInfo other = (TopicsInfo) o;
+                return other.sourceTopics.equals(sourceTopics) && other.stateChangelogTopics.equals(stateChangelogTopics);
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public int hashCode() {
+            final long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
+            return (int) (n % 0xFFFFFFFFL);
+        }
+
+        @Override
+        public String toString() {
+            return "TopicsInfo{" +
+                "sinkTopics=" + sinkTopics +
+                ", sourceTopics=" + sourceTopics +
+                ", repartitionSourceTopics=" + repartitionSourceTopics +
+                ", stateChangelogTopics=" + stateChangelogTopics +
+                '}';
+        }
+    }
+
     public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription {
         private final Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies = new HashSet<>();
         private final Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 79c38b0..eb2a171 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -57,6 +57,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     /**
      * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
      */
+    @SuppressWarnings("deprecation")
     @Override
     public StateStore getStateStore(final String name) {
         if (currentNode() == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
index e26d110..1d9e722 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
@@ -35,6 +35,7 @@ class SourceNodeRecordDeserializer implements RecordDeserializer {
         this.deserializationExceptionHandler = deserializationExceptionHandler;
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord) {
         final Object key;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index e8b6a1a..f9ae216 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
@@ -331,10 +330,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // parse the topology to determine the repartition source topics,
         // making sure they are created with the number of partitions as
         // the maximum of the depending sub-topologies source topics' number of partitions
-        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
+        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
 
         Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
-        for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+        for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
             for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
                 repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
             }
@@ -344,13 +343,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         do {
             numPartitionsNeeded = false;
 
-            for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+            for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
                 for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
                     int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
 
                     // try set the number of partitions for this repartition topic if it is not set yet
                     if (numPartitions == UNKNOWN) {
-                        for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
+                        for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
                             Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
 
                             if (otherSinkTopics.contains(topicName)) {
@@ -418,7 +417,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // get the tasks as partition groups from the partition grouper
         Set<String> allSourceTopics = new HashSet<>();
         Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
             allSourceTopics.addAll(entry.getValue().sourceTopics);
             sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
         }
@@ -462,7 +461,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         // add tasks to state change log topic subscribers
         Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
             final int topicGroupId = entry.getKey();
             final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
 
@@ -646,6 +645,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
      *
      * @param topicPartitions Map that contains the topic names to be created with the number of partitions
      */
+    @SuppressWarnings("deprecation")
     private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) {
         log.debug("{} Starting to validate internal topics in partition assignor.", logPrefix);
 
@@ -775,6 +775,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             this.logPrefix = String.format("stream-thread [%s]", threadName);
         }
 
+        @SuppressWarnings("deprecation")
         void validate(final Set<String> copartitionGroup,
                       final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
                       final Cluster metadata) {