You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/07/20 17:47:54 UTC

kafka git commit: KAFKA-3856; Cleanup Kafka Stream builder API (KIP-120)

Repository: kafka
Updated Branches:
  refs/heads/trunk e0099e1f5 -> b04bed022


KAFKA-3856; Cleanup Kafka Stream builder API (KIP-120)

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bb...@gmail.com>

Closes #2301 from mjsax/kafka-3856-topology-builder-API


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b04bed02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b04bed02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b04bed02

Branch: refs/heads/trunk
Commit: b04bed022aec0f1f478a03383ab5184f048133b6
Parents: e0099e1
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Jul 20 18:47:47 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Jul 20 18:47:47 2017 +0100

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |   4 +-
 .../streams/processor/TopologyBuilder.java      | 144 +++++-
 .../streams/processor/TopologyDescription.java  | 476 +++++++++++++++++++
 .../kafka/streams/processor/TopologyTest.java   | 405 ++++++++++++++++
 4 files changed, 1012 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b04bed02/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5c00ede..6d2b559 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -117,7 +117,7 @@
 
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
-              files="(KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
+              files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
 
     <suppress checks="MethodLength"
               files="StreamPartitionAssignor.java"/>
@@ -128,7 +128,7 @@
               files="RocksDBWindowStoreSupplier.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(KStreamImpl|StreamPartitionAssignor|KafkaStreams|KTableImpl).java"/>
+              files="(TopologyBuilder|KStreamImpl|StreamPartitionAssignor|KafkaStreams|KTableImpl).java"/>
 
     <suppress checks="CyclomaticComplexity"
               files="TopologyBuilder.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04bed02/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index c3614bb..4508c77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -144,23 +144,25 @@ public class TopologyBuilder {
     }
 
     private static abstract class NodeFactory {
-        public final String name;
+        final String name;
+        final String[] parents;
 
-        NodeFactory(String name) {
+        NodeFactory(final String name, final String[] parents) {
             this.name = name;
+            this.parents = parents;
         }
 
         public abstract ProcessorNode build();
+
+        abstract TopologyDescription.AbstractNode describe();
     }
 
     private static class ProcessorNodeFactory extends NodeFactory {
-        private final String[] parents;
         private final ProcessorSupplier<?, ?> supplier;
         private final Set<String> stateStoreNames = new HashSet<>();
 
         ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier<?, ?> supplier) {
-            super(name);
-            this.parents = parents.clone();
+            super(name, parents.clone());
             this.supplier = supplier;
         }
 
@@ -172,6 +174,11 @@ public class TopologyBuilder {
         public ProcessorNode build() {
             return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
         }
+
+        @Override
+        TopologyDescription.Processor describe() {
+            return new TopologyDescription.Processor(name, new HashSet<>(stateStoreNames));
+        }
     }
 
     private class SourceNodeFactory extends NodeFactory {
@@ -187,7 +194,7 @@ public class TopologyBuilder {
                                   final TimestampExtractor timestampExtractor,
                                   final Deserializer<?> keyDeserializer,
                                   final Deserializer<?> valDeserializer) {
-            super(name);
+            super(name, new String[0]);
             this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
             this.pattern = pattern;
             this.keyDeserializer = keyDeserializer;
@@ -236,18 +243,30 @@ public class TopologyBuilder {
         private boolean isMatch(String topic) {
             return this.pattern.matcher(topic).matches();
         }
+
+        @Override
+        TopologyDescription.Source describe() {
+            String sourceTopics;
+
+            if (pattern == null) {
+                sourceTopics = topics.toString();
+                sourceTopics = sourceTopics.substring(1, sourceTopics.length() - 1); // trim first and last, ie. []
+            } else {
+                sourceTopics = pattern.toString();
+            }
+
+            return new TopologyDescription.Source(name, sourceTopics);
+        }
     }
 
     private class SinkNodeFactory<K, V> extends NodeFactory {
-        private final String[] parents;
         private final String topic;
         private final Serializer<K> keySerializer;
         private final Serializer<V> valSerializer;
         private final StreamPartitioner<? super K, ? super V> partitioner;
 
         private SinkNodeFactory(String name, String[] parents, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
-            super(name);
-            this.parents = parents.clone();
+            super(name, parents.clone());
             this.topic = topic;
             this.keySerializer = keySerializer;
             this.valSerializer = valSerializer;
@@ -263,6 +282,11 @@ public class TopologyBuilder {
                 return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner);
             }
         }
+
+        @Override
+        TopologyDescription.Sink describe() {
+            return new TopologyDescription.Sink(name, topic);
+        }
     }
 
     public static class TopicsInfo {
@@ -1196,12 +1220,8 @@ public class TopologyBuilder {
         for (final Map.Entry<Integer, Set<String>> nodeGroup : nodeGroups().entrySet()) {
             final Set<String> nodes = nodeGroup.getValue();
             for (String node : nodes) {
-                final NodeFactory nodeFactory = nodeFactories.get(node);
-                if (nodeFactory instanceof SourceNodeFactory) {
-                    final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
-                    if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) {
-                        globalGroups.addAll(nodes);
-                    }
+                if (isGlobalSource(node)) {
+                    globalGroups.addAll(nodes);
                 }
             }
         }
@@ -1558,4 +1578,98 @@ public class TopologyBuilder {
         setRegexMatchedTopicsToSourceNodes();
         setRegexMatchedTopicToStateStore();
     }
+
+    private boolean isGlobalSource(final String nodeName) {
+        final NodeFactory nodeFactory = nodeFactories.get(nodeName);
+
+        if (nodeFactory instanceof SourceNodeFactory) {
+            final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
+            if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    TopologyDescription describe() {
+        final TopologyDescription description = new TopologyDescription();
+
+        describeSubtopologies(description);
+        describeGlobalStores(description);
+
+        return description;
+    }
+
+    private void describeSubtopologies(final TopologyDescription description) {
+        for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
+
+            final Set<String> allNodesOfGroups = nodeGroup.getValue();
+            final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(allNodesOfGroups);
+
+            if (!isNodeGroupOfGlobalStores) {
+                describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
+            }
+        }
+    }
+
+    private boolean nodeGroupContainsGlobalSourceNode(final Set<String> allNodesOfGroups) {
+        for (final String node : allNodesOfGroups) {
+            if (isGlobalSource(node)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void describeSubtopology(final TopologyDescription description,
+                                     final Integer subtopologyId,
+                                     final Set<String> nodeNames) {
+
+        final HashMap<String, TopologyDescription.AbstractNode> nodesByName = new HashMap<>();
+
+        // add all nodes
+        for (final String nodeName : nodeNames) {
+            nodesByName.put(nodeName, nodeFactories.get(nodeName).describe());
+        }
+
+        // connect each node to its predecessors and successors
+        for (final TopologyDescription.AbstractNode node : nodesByName.values()) {
+            for (final String predecessorName : nodeFactories.get(node.name()).parents) {
+                final TopologyDescription.AbstractNode predecessor = nodesByName.get(predecessorName);
+                node.addPredecessor(predecessor);
+                predecessor.addSuccessor(node);
+            }
+        }
+
+        description.addSubtopology(new TopologyDescription.Subtopology(
+            subtopologyId,
+            new HashSet<TopologyDescription.Node>(nodesByName.values())));
+    }
+
+    private void describeGlobalStores(final TopologyDescription description) {
+        for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
+            final Set<String> nodes = nodeGroup.getValue();
+
+            final Iterator<String> it = nodes.iterator();
+            while (it.hasNext()) {
+                final String node = it.next();
+
+                if (isGlobalSource(node)) {
+                    // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode}
+                    it.remove(); // remove sourceNode from group
+                    final String processorNode = nodes.iterator().next(); // get remaining processorNode
+
+                    description.addGlobalStore(new TopologyDescription.GlobalStore(
+                        node,
+                        processorNode,
+                        ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+                        nodeToSourceTopics.get(node).get(0)
+                    ));
+                    break;
+                }
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04bed02/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
new file mode 100644
index 0000000..0949bf5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A meta representation of a {@link Topology topology}.
+ * <p>
+ * The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected.
+ * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one
+ * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology
+ * {@link Topology#addSource(String, String...) reads} from the same topic.
+ * <p>
+ * For {@link KafkaStreams#start() execution} sub-topologies are translated into {@link StreamTask tasks}.
+ */
+// TODO make public (hide until KIP-120 if fully implemented)
+final class TopologyDescription {
+    private final Set<Subtopology> subtopologies = new HashSet<>();
+    private final Set<GlobalStore> globalStores = new HashSet<>();
+
+    /**
+     * A connected sub-graph of a {@link Topology}.
+     * <p>
+     * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String, ProcessorSupplier, String...)
+     * directly} or indirectly via {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
+     * (i.e., if multiple processors share the same state).
+     */
+    public final static class Subtopology {
+        private final int id;
+        private final Set<Node> nodes;
+
+        Subtopology(final int id,
+                    final Set<Node> nodes) {
+            this.id = id;
+            this.nodes = nodes;
+        }
+
+        /**
+         * Internally assigned unique ID.
+         * @return the ID of the sub-topology
+         */
+        public int id() {
+            return id;
+        }
+
+        /**
+         * All nodes of this sub-topology.
+         * @return set of all nodes within the sub-topology
+         */
+        public Set<Node> nodes() {
+            return Collections.unmodifiableSet(nodes);
+        }
+
+        @Override
+        public String toString() {
+            return "Sub-topology: " + id + "\n" + nodesAsString();
+        }
+
+        private String nodesAsString() {
+            final StringBuilder sb = new StringBuilder();
+            for (final Node node : nodes) {
+                sb.append("    ");
+                sb.append(node);
+                sb.append('\n');
+            }
+            return sb.toString();
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final Subtopology that = (Subtopology) o;
+            return id == that.id
+                && nodes.equals(that.nodes);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(id, nodes);
+        }
+    }
+
+    /**
+     * Represents a {@link Topology#addGlobalStore(StateStoreSupplier, String,
+     * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
+     * String, ProcessorSupplier)} global store}.
+     * Adding a global store results in adding a source node and one stateful processor node.
+     * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
+     * global stores are not connected to each other.
+     * Furthermore, global stores are available to all processors without connecting them explicitly, and thus global
+     * stores will never be part of any {@link Subtopology}.
+     */
+    public final static class GlobalStore {
+        private final Source source;
+        private final Processor processor;
+
+        GlobalStore(final String sourceName,
+                    final String processorName,
+                    final String storeName,
+                    final String topicName) {
+            source = new Source(sourceName, topicName);
+            processor = new Processor(processorName, Collections.singleton(storeName));
+            source.successors.add(processor);
+            processor.predecessors.add(source);
+        }
+
+        /**
+         * The source node reading from a "global" topic.
+         * @return the "global" source node
+         */
+        public Source source() {
+            return source;
+        }
+
+        /**
+         * The processor node maintaining the global store.
+         * @return the "global" processor node
+         */
+        public Processor processor() {
+            return processor;
+        }
+
+        @Override
+        public String toString() {
+            return "GlobalStore: " + source.name + "(topic: " + source.topics + ") -> "
+                + processor.name + "(store: " + processor.stores.iterator().next() + ")\n";
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final GlobalStore that = (GlobalStore) o;
+            return source.equals(that.source)
+                && processor.equals(that.processor);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(source, processor);
+        }
+    }
+
+    /**
+     * A node of a topology. Can be a source, sink, or processor node.
+     */
+    public interface Node {
+        /**
+         * The name of the node. Will never be {@code null}.
+         * @return the name of the node
+         */
+        String name();
+        /**
+         * The predecessors of this node within a sub-topology.
+         * Note, sources do not have any predecessors.
+         * Will never be {@code null}.
+         * @return set of all predecessors
+         */
+        Set<Node> predecessors();
+        /**
+         * The successor of this node within a sub-topology.
+         * Note, sinks do not have any successors.
+         * Will never be {@code null}.
+         * @return set of all successor
+         */
+        Set<Node> successors();
+    }
+
+    abstract static class AbstractNode implements Node {
+        final String name;
+        final Set<Node> predecessors = new HashSet<>();
+        final Set<Node> successors = new HashSet<>();
+
+        AbstractNode(final String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
+
+        @Override
+        public Set<Node> predecessors() {
+            return Collections.unmodifiableSet(predecessors);
+        }
+
+        @Override
+        public Set<Node> successors() {
+            return Collections.unmodifiableSet(successors);
+        }
+
+        void addPredecessor(final Node predecessor) {
+            predecessors.add(predecessor);
+        }
+
+        void addSuccessor(final Node successor) {
+            successors.add(successor);
+        }
+    }
+
+    /**
+     * A source node of a topology.
+     */
+    public final static class Source extends AbstractNode {
+        private final String topics;
+
+        Source(final String name,
+               final String topics) {
+            super(name);
+            this.topics = topics;
+        }
+
+        /**
+         * The topic names this source node is reading from.
+         * @return comma separated list of topic names or pattern (as String)
+         */
+        public String topics() {
+            return topics;
+        }
+
+        @Override
+        void addPredecessor(final Node predecessor) {
+            throw new UnsupportedOperationException("Sources don't have predecessors.");
+        }
+
+        @Override
+        public String toString() {
+            return "Source: " + name + "(topics: " + topics + ") --> " + nodeNames(successors);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final Source source = (Source) o;
+            // omit successor to avoid infinite loops
+            return name.equals(source.name)
+                && topics.equals(source.topics);
+        }
+
+        @Override
+        public int hashCode() {
+            // omit successor as it might change and alter the hash code
+            return Objects.hash(name, topics);
+        }
+    }
+
+    /**
+     * A processor node of a topology.
+     */
+    public final static class Processor extends AbstractNode {
+        private final Set<String> stores;
+
+        Processor(final String name,
+                  final Set<String> stores) {
+            super(name);
+            this.stores = stores;
+        }
+
+        /**
+         * The names of all connected stores.
+         * @return set of store names
+         */
+        public Set<String> stores() {
+            return Collections.unmodifiableSet(stores);
+        }
+
+        @Override
+        public String toString() {
+            return "Processor: " + name + "(stores: " + stores + ") --> " + nodeNames(successors) + " <-- " + nodeNames(predecessors);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final Processor processor = (Processor) o;
+            // omit successor to avoid infinite loops
+            return name.equals(processor.name)
+                && stores.equals(processor.stores)
+                && predecessors.equals(processor.predecessors);
+        }
+
+        @Override
+        public int hashCode() {
+            // omit successor as it might change and alter the hash code
+            return Objects.hash(name, stores);
+        }
+    }
+
+    /**
+     * A sink node of a topology.
+     */
+    public final static class Sink extends AbstractNode {
+        private final String topic;
+
+        Sink(final String name,
+             final String topic) {
+            super(name);
+            this.topic = topic;
+        }
+
+        /**
+         * The topic name this sink node is writing to.
+         * @return a topic name
+         */
+        public String topic() {
+            return topic;
+        }
+
+        @Override
+        void addSuccessor(final Node successor) {
+            throw new UnsupportedOperationException("Sinks don't have successors.");
+        }
+
+        @Override
+        public String toString() {
+            return "Sink: " + name + "(topic: " + topic + ") <-- " + nodeNames(predecessors);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final Sink sink = (Sink) o;
+            return name.equals(sink.name)
+                && topic.equals(sink.topic)
+                && predecessors.equals(sink.predecessors);
+        }
+
+        @Override
+        public int hashCode() {
+            // omit predecessors as it might change and alter the hash code
+            return Objects.hash(name, topic);
+        }
+    }
+
+    void addSubtopology(final Subtopology subtopology) {
+        subtopologies.add(subtopology);
+    }
+
+    void addGlobalStore(final GlobalStore globalStore) {
+        globalStores.add(globalStore);
+    }
+
+    /**
+     * All sub-topologies of the represented topology.
+     * @return set of all sub-topologies
+     */
+    public Set<Subtopology> subtopologies() {
+        return Collections.unmodifiableSet(subtopologies);
+    }
+
+    /**
+     * All global stores of the represented topology.
+     * @return set of all global stores
+     */
+    public Set<GlobalStore> globalStores() {
+        return Collections.unmodifiableSet(globalStores);
+    }
+
+    @Override
+    public String toString() {
+        return subtopologiesAsString() + globalStoresAsString();
+    }
+
+    private static String nodeNames(final Set<Node> nodes) {
+        final StringBuilder sb = new StringBuilder();
+        if (!nodes.isEmpty()) {
+            for (final Node n : nodes) {
+                sb.append(n.name());
+                sb.append(", ");
+            }
+            sb.deleteCharAt(sb.length() - 1);
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        return sb.toString();
+    }
+
+    private String subtopologiesAsString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("Sub-topologies: \n");
+        if (subtopologies.isEmpty()) {
+            sb.append("  none\n");
+        } else {
+            for (final Subtopology st : subtopologies) {
+                sb.append("  ");
+                sb.append(st);
+            }
+        }
+        return sb.toString();
+    }
+
+    private String globalStoresAsString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("Global Stores:\n");
+        if (globalStores.isEmpty()) {
+            sb.append("  none\n");
+        } else {
+            for (final GlobalStore gs : globalStores) {
+                sb.append("  ");
+                sb.append(gs);
+            }
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        final TopologyDescription that = (TopologyDescription) o;
+        return subtopologies.equals(that.subtopologies)
+            && globalStores.equals(that.globalStores);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(subtopologies, globalStores);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04bed02/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
new file mode 100644
index 0000000..17c5640
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+// TODO (remove this comment) Test name ok, we just use TopologyBuilder for now in this test until Topology gets added
+public class TopologyTest {
+    // TODO change from TopologyBuilder to Topology
+    private final TopologyBuilder topology = new TopologyBuilder();
+    private final TopologyDescription expectedDescription = new TopologyDescription();
+
+    @Test
+    public void shouldDescribeEmptyTopology() {
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void singleSourceShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+
+        expectedDescription.addSubtopology(
+            new TopologyDescription.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3");
+
+        expectedDescription.addSubtopology(
+            new TopologyDescription.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void singleSourcePatternShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]"));
+
+        expectedDescription.addSubtopology(
+            new TopologyDescription.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void multipleSourcesShouldHaveDistinctSubtopologies() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+        expectedDescription.addSubtopology(
+            new TopologyDescription.Subtopology(0,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        expectedDescription.addSubtopology(
+            new TopologyDescription.Subtopology(1,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        expectedDescription.addSubtopology(
+            new TopologyDescription.Subtopology(2,
+                Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void sourceAndProcessorShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final String[] store = new String[] {"store"};
+        final TopologyDescription.Processor expectedProcessorNode
+            = addProcessorWithNewStore("processor", store, expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+
+    @Test
+    public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final String[] stores = new String[] {"store1", "store2"};
+        final TopologyDescription.Processor expectedProcessorNode
+            = addProcessorWithNewStore("processor", stores, expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode);
+        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode);
+        allNodes.add(expectedProcessorNode1);
+        allNodes.add(expectedProcessorNode2);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic0");
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", Pattern.compile("topic[1-9]"));
+        final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode1, expectedSourceNode2);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode1);
+        allNodes.add(expectedSourceNode2);
+        allNodes.add(expectedProcessorNode);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
+
+        final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
+        allNodes1.add(expectedSourceNode1);
+        allNodes1.add(expectedProcessorNode1);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes1));
+
+        final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
+        allNodes2.add(expectedSourceNode2);
+        allNodes2.add(expectedProcessorNode2);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(1, allNodes2));
+
+        final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
+        allNodes3.add(expectedSourceNode3);
+        allNodes3.add(expectedProcessorNode3);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(2, allNodes3));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+        final TopologyDescription.Sink expectedSinkNode1 = addSink("sink1", "sinkTopic1", expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Sink expectedSinkNode2 = addSink("sink2", "sinkTopic2", expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Sink expectedSinkNode3 = addSink("sink3", "sinkTopic3", expectedSourceNode3);
+
+        final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
+        allNodes1.add(expectedSourceNode1);
+        allNodes1.add(expectedSinkNode1);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes1));
+
+        final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
+        allNodes2.add(expectedSourceNode2);
+        allNodes2.add(expectedSinkNode2);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(1, allNodes2));
+
+        final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
+        allNodes3.add(expectedSourceNode3);
+        allNodes3.add(expectedSinkNode3);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(2, allNodes3));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void processorsWithSameSinkShouldHaveSameSubtopology() {
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
+
+        final TopologyDescription.Sink expectedSinkNode = addSink(
+            "sink",
+            "sinkTopic",
+            expectedProcessorNode1,
+            expectedProcessorNode2,
+            expectedProcessorNode3);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode1);
+        allNodes.add(expectedProcessorNode1);
+        allNodes.add(expectedSourceNode2);
+        allNodes.add(expectedProcessorNode2);
+        allNodes.add(expectedSourceNode3);
+        allNodes.add(expectedProcessorNode3);
+        allNodes.add(expectedSinkNode);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void processorsWithSharedStateShouldHaveSameSubtopology() {
+        final String[] store1 = new String[] {"store1"};
+        final String[] store2 = new String[] {"store2"};
+        final String[] bothStores = new String[] {store1[0], store2[0]};
+
+        final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
+        final TopologyDescription.Processor expectedProcessorNode1
+            = addProcessorWithNewStore("processor1", store1, expectedSourceNode1);
+
+        final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+        final TopologyDescription.Processor expectedProcessorNode2
+            = addProcessorWithNewStore("processor2", store2, expectedSourceNode2);
+
+        final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+        final TopologyDescription.Processor expectedProcessorNode3
+            = addProcessorWithExistingStore("processor3", bothStores, expectedSourceNode3);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSourceNode1);
+        allNodes.add(expectedProcessorNode1);
+        allNodes.add(expectedSourceNode2);
+        allNodes.add(expectedProcessorNode2);
+        allNodes.add(expectedSourceNode3);
+        allNodes.add(expectedProcessorNode3);
+        expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void shouldDescribeGlobalStoreTopology() {
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor");
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void shouldDescribeMultipleGlobalStoreTopology() {
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1");
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2");
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    private TopologyDescription.Source addSource(final String sourceName,
+                                                 final String... sourceTopic) {
+        topology.addSource(sourceName, sourceTopic);
+        String allSourceTopics = sourceTopic[0];
+        for (int i = 1; i < sourceTopic.length; ++i) {
+            allSourceTopics += ", " + sourceTopic[i];
+        }
+        return new TopologyDescription.Source(sourceName, allSourceTopics);
+    }
+
+    private TopologyDescription.Source addSource(final String sourceName,
+                                                 final Pattern sourcePattern) {
+        topology.addSource(sourceName, sourcePattern);
+        return new TopologyDescription.Source(sourceName, sourcePattern.toString());
+    }
+
+    private TopologyDescription.Processor addProcessor(final String processorName,
+                                                       final TopologyDescription.AbstractNode... parents) {
+        return addProcessorWithNewStore(processorName, new String[0], parents);
+    }
+
+    private TopologyDescription.Processor addProcessorWithNewStore(final String processorName,
+                                                                   final String[] storeNames,
+                                                                   final TopologyDescription.AbstractNode... parents) {
+        return addProcessorWithStore(processorName, storeNames, true, parents);
+    }
+
+    private TopologyDescription.Processor addProcessorWithExistingStore(final String processorName,
+                                                                        final String[] storeNames,
+                                                                        final TopologyDescription.AbstractNode... parents) {
+        return addProcessorWithStore(processorName, storeNames, false, parents);
+    }
+
+    private TopologyDescription.Processor addProcessorWithStore(final String processorName,
+                                                                final String[] storeNames,
+                                                                final boolean newStores,
+                                                                final TopologyDescription.AbstractNode... parents) {
+        final String[] parentNames = new String[parents.length];
+        for (int i = 0; i < parents.length; ++i) {
+            parentNames[i] = parents[i].name();
+        }
+
+        topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
+        if (newStores) {
+            for (final String store : storeNames) {
+                topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
+            }
+        } else {
+            topology.connectProcessorAndStateStores(processorName, storeNames);
+        }
+        final TopologyDescription.Processor expectedProcessorNode
+            = new TopologyDescription.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
+
+        for (final TopologyDescription.AbstractNode parent : parents) {
+            parent.addSuccessor(expectedProcessorNode);
+            expectedProcessorNode.addPredecessor(parent);
+        }
+
+        return expectedProcessorNode;
+    }
+
+    private TopologyDescription.Sink addSink(final String sinkName,
+                                             final String sinkTopic,
+                                             final TopologyDescription.AbstractNode... parents) {
+        final String[] parentNames = new String[parents.length];
+        for (int i = 0; i < parents.length; ++i) {
+            parentNames[i] = parents[i].name();
+        }
+
+        topology.addSink(sinkName, sinkTopic, parentNames);
+        final TopologyDescription.Sink expectedSinkNode
+            = new TopologyDescription.Sink(sinkName, sinkTopic);
+
+        for (final TopologyDescription.AbstractNode parent : parents) {
+            parent.addSuccessor(expectedSinkNode);
+            expectedSinkNode.addPredecessor(parent);
+        }
+
+        return expectedSinkNode;
+    }
+
+    private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
+                                                                final String sourceName,
+                                                                final String globalTopicName,
+                                                                final String processorName) {
+        topology.addGlobalStore(
+            new MockStateStoreSupplier(globalStoreName, false, false),
+            sourceName,
+            null,
+            null,
+            globalTopicName,
+            processorName,
+            new MockProcessorSupplier());
+
+        final TopologyDescription.GlobalStore expectedGlobalStore = new TopologyDescription.GlobalStore(
+            sourceName,
+            processorName,
+            globalStoreName,
+            globalTopicName);
+
+        expectedDescription.addGlobalStore(expectedGlobalStore);
+    }
+
+}