You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/04 15:40:02 UTC

[kafka] branch trunk updated: KAFKA-6761: Part 1 of 3; Graph nodes (#4923)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 515ce21  KAFKA-6761: Part 1 of 3; Graph nodes (#4923)
515ce21 is described below

commit 515ce21c74743171df6cb069b4754f68fe7f2233
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri May 4 11:39:56 2018 -0400

    KAFKA-6761: Part 1 of 3; Graph nodes (#4923)
    
    This PR supersedes PR #4654 as it was growing too large. All comments in that PR should be addressed here.
    I will attempt to break the PRs for the topology optimization effort into 3 PRs total and will follow this general plan:
    
    1. This PR only adds the graph nodes and graph. The graph nodes will hold the information used to make calls to the InternalTopologyBuilder when using the DSL. Graph nodes are stored in the StreamsTopologyGraph until the final topology needs building then the graph is traversed and optimizations are made at that point. There are no tests in this PR relying on the follow-up PR to use all current streams tests, which should suffice.
    
    2. PR 2 will intercept all DSL calls and build the graph. The InternalStreamsBuilder uses the graph to provide the required info to the InternalTopologyBuilder and build a topology. The condition of satisfaction for this PR is that all current unit, integration and system tests pass using the graph.
    
    3. PR 3 adds some optimizations mainly automatically repartitioning for operations that may modify a key and have child operations that would normally create a separate repartition topic, saving possible unnecessary repartition topics. For example the following topology:
    
    ```
    KStream<String, String> mappedStreamOther = inputStream.map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() {
                @Override
                public KeyValue<? extends String, ? extends String> apply(String key, String value) {
    
                    return KeyValue.pair(key.substring(0, 3), value);
                }
            });
    
    
            mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("count-one-out");
            mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(10000)).count().toStream().to("count-two-out");
            mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(15000)).count().toStream().to("count-three-out");
    ```
    
    would create 3 repartion topics, but after applying an optimization strategy, only one is created.
    
    Reviewers: John Roesler <jo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../kstream/internals/BaseJoinProcessorNode.java   |  99 ++++++++++
 .../kstream/internals/KTableKTableJoinNode.java    | 155 +++++++++++++++
 .../kstream/internals/ProcessorParameters.java     |  46 +++++
 .../streams/kstream/internals/RepartitionNode.java | 165 ++++++++++++++++
 .../kstream/internals/StatefulProcessorNode.java   | 133 +++++++++++++
 .../kstream/internals/StatefulRepartitionNode.java | 169 ++++++++++++++++
 .../kstream/internals/StatefulSourceNode.java      | 197 +++++++++++++++++++
 .../kstream/internals/StatelessProcessorNode.java  |  79 ++++++++
 .../streams/kstream/internals/StreamSinkNode.java  |  72 +++++++
 .../kstream/internals/StreamSourceNode.java        |  92 +++++++++
 .../kstream/internals/StreamStreamJoinNode.java    | 213 +++++++++++++++++++++
 .../kstream/internals/StreamTableJoinNode.java     |  59 ++++++
 .../kstream/internals/StreamsGraphNode.java        | 106 ++++++++++
 .../kstream/internals/StreamsTopologyGraph.java    | 137 +++++++++++++
 14 files changed, 1722 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java
new file mode 100644
index 0000000..899ee71
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+/**
+ * Utility base class containing the common fields between
+ * a Stream-Stream join and a Table-Table join
+ */
+abstract class BaseJoinProcessorNode<K, V1, V2, VR> extends StreamsGraphNode {
+
+    private final ProcessorSupplier<K, V1> joinThisProcessSupplier;
+    private final ProcessorSupplier<K, V2> joinOtherProcessSupplier;
+    private final ProcessorSupplier<K, VR> joinMergeProcessor;
+    private final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner;
+    private final String joinThisProcessorName;
+    private final String joinOtherProcessorName;
+    private final String joinMergeProcessorName;
+    private final String thisJoinSide;
+    private final String otherJoinSide;
+
+
+    BaseJoinProcessorNode(final String parentProcessorNodeName,
+                          final String processorNodeName,
+                          final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
+                          final ProcessorParameters<K, V1> joinThisProcessorDetails,
+                          final ProcessorParameters<K, V2> joinOtherProcessDetails,
+                          final ProcessorParameters<K, VR> joinMergeProcessorDetails,
+                          final String thisJoinSide,
+                          final String otherJoinSide) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              false);
+
+        this.valueJoiner = valueJoiner;
+        this.joinThisProcessSupplier = joinThisProcessorDetails.processorSupplier();
+        this.joinOtherProcessSupplier = joinOtherProcessDetails.processorSupplier();
+        this.joinMergeProcessor = joinMergeProcessorDetails.processorSupplier();
+        this.joinThisProcessorName = joinThisProcessorDetails.processorName();
+        this.joinOtherProcessorName = joinOtherProcessDetails.processorName();
+        this.joinMergeProcessorName = joinMergeProcessorDetails.processorName();
+        this.thisJoinSide = thisJoinSide;
+        this.otherJoinSide = otherJoinSide;
+    }
+
+    ProcessorSupplier<K, V1> joinThisProcessorSupplier() {
+        return joinThisProcessSupplier;
+    }
+
+    ProcessorSupplier<K, V2> joinOtherProcessorSupplier() {
+        return joinOtherProcessSupplier;
+    }
+
+    ProcessorSupplier<K, VR> joinMergeProcessor() {
+        return joinMergeProcessor;
+    }
+
+    ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner() {
+        return valueJoiner;
+    }
+
+    String joinThisProcessorName() {
+        return joinThisProcessorName;
+    }
+
+    String joinOtherProcessorName() {
+        return joinOtherProcessorName;
+    }
+
+    String joinMergeProcessorName() {
+        return joinMergeProcessorName;
+    }
+
+    String thisJoinSide() {
+        return thisJoinSide;
+    }
+
+    String otherJoinSide() {
+        return otherJoinSide;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java
new file mode 100644
index 0000000..f76aa0d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.Arrays;
+
+/**
+ * Too much specific information to generalize so the
+ * KTable-KTable join requires a specific node.
+ */
+class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
+
+    private final String[] joinThisStoreNames;
+    private final String[] joinOtherStoreNames;
+
+    KTableKTableJoinNode(final String parentProcessorNodeName,
+                         final String processorNodeName,
+                         final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
+                         final ProcessorParameters<K, V1> joinThisProcessorParameters,
+                         final ProcessorParameters<K, V2> joinOtherProcessorParameters,
+                         final ProcessorParameters<K, VR> joinMergeProcessorParameters,
+                         final String thisJoinSide,
+                         final String otherJoinSide,
+                         final String[] joinThisStoreNames,
+                         final String[] joinOtherStoreNames) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              valueJoiner,
+              joinThisProcessorParameters,
+              joinOtherProcessorParameters,
+              joinMergeProcessorParameters,
+              thisJoinSide,
+              otherJoinSide);
+
+        this.joinThisStoreNames = joinThisStoreNames;
+        this.joinOtherStoreNames = joinOtherStoreNames;
+    }
+
+    String[] joinThisStoreNames() {
+        return Arrays.copyOf(joinThisStoreNames, joinThisStoreNames.length);
+    }
+
+    String[] joinOtherStoreNames() {
+        return Arrays.copyOf(joinOtherStoreNames, joinOtherStoreNames.length);
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    static <K, V, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
+        return new KTableKTableJoinNodeBuilder<>();
+    }
+
+    static final class KTableKTableJoinNodeBuilder<K, V1, V2, VR> {
+
+        private String processorNodeName;
+        private String parentProcessorNodeName;
+        private String[] joinThisStoreNames;
+        private ProcessorParameters<K, V1> joinThisProcessorParameters;
+        private String[] joinOtherStoreNames;
+        private ProcessorParameters<K, V2> joinOtherProcessorParameters;
+        private ProcessorParameters<K, VR> joinMergeProcessorParameters;
+        private ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner;
+        private String thisJoinSide;
+        private String otherJoinSide;
+
+        private KTableKTableJoinNodeBuilder() {
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinThisStoreNames(final String[] joinThisStoreNames) {
+            this.joinThisStoreNames = joinThisStoreNames;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinThisProcessorParameters(final ProcessorParameters<K, V1> joinThisProcessorParameters) {
+            this.joinThisProcessorParameters = joinThisProcessorParameters;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR> withProcessorNodeName(String processorNodeName) {
+            this.processorNodeName = processorNodeName;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinOtherStoreNames(final String[] joinOtherStoreNames) {
+            this.joinOtherStoreNames = joinOtherStoreNames;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR> withParentProcessorNodeName(final String parentProcessorNodeName) {
+            this.parentProcessorNodeName = parentProcessorNodeName;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinOtherProcessorParameters(final ProcessorParameters<K, V2> joinOtherProcessorParameters) {
+            this.joinOtherProcessorParameters = joinOtherProcessorParameters;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> joinMergeProcessorParameters) {
+            this.joinMergeProcessorParameters = joinMergeProcessorParameters;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withValueJoiner(final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner) {
+            this.valueJoiner = valueJoiner;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withThisJoinSide(final String thisJoinSide) {
+            this.thisJoinSide = thisJoinSide;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withOtherJoinSide(final String otherJoinSide) {
+            this.otherJoinSide = otherJoinSide;
+            return this;
+        }
+
+        KTableKTableJoinNode<K, V1, V2, VR> build() {
+
+            return new KTableKTableJoinNode<>(parentProcessorNodeName,
+                                              processorNodeName,
+                                              valueJoiner,
+                                              joinThisProcessorParameters,
+                                              joinOtherProcessorParameters,
+                                              joinMergeProcessorParameters,
+                                              thisJoinSide,
+                                              otherJoinSide,
+                                              joinThisStoreNames,
+                                              joinOtherStoreNames);
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProcessorParameters.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProcessorParameters.java
new file mode 100644
index 0000000..cab1589
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProcessorParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+/**
+ * Class used to represent a {@link ProcessorSupplier} and the name
+ * used to register it with the {@link org.apache.kafka.streams.processor.internals.InternalTopologyBuilder}
+ *
+ * Used by the Join nodes as there are several parameters, this abstraction helps
+ * keep the number of arguments more reasonable.
+ */
+class ProcessorParameters<K, V> {
+
+    private final ProcessorSupplier<K, V> processorSupplier;
+    private final String processorName;
+
+    ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier, final String processorName) {
+        this.processorSupplier = processorSupplier;
+        this.processorName = processorName;
+    }
+
+    ProcessorSupplier<K, V> processorSupplier() {
+        return processorSupplier;
+    }
+
+    String processorName() {
+        return processorName;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionNode.java
new file mode 100644
index 0000000..d8aaee9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionNode.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+class RepartitionNode<K, V> extends StatelessProcessorNode<K, V> {
+
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private final String sinkName;
+    private final String sourceName;
+    private final String repartitionTopic;
+    private final String processorName;
+
+
+    RepartitionNode(final String parentProcessorNodeName,
+                    final String processorNodeName,
+                    final String sourceName,
+                    final ProcessorSupplier<K, V> processorSupplier,
+                    final Serde<K> keySerde,
+                    final Serde<V> valueSerde,
+                    final String sinkName,
+                    final String repartitionTopic,
+                    final String processorName) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              processorSupplier,
+              false);
+
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        this.sinkName = sinkName;
+        this.sourceName = sourceName;
+        this.repartitionTopic = repartitionTopic;
+        this.processorName = processorName;
+    }
+
+    Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    String sinkName() {
+        return sinkName;
+    }
+
+    String sourceName() {
+        return sourceName;
+    }
+
+    String repartitionTopic() {
+        return repartitionTopic;
+    }
+
+    String processorName() {
+        return processorName;
+    }
+
+    @Override
+    void writeToTopology(InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    static <K, V> RepartitionNodeBuilder<K, V> repartitionNodeBuilder() {
+        return new RepartitionNodeBuilder<>();
+    }
+
+
+    static final class RepartitionNodeBuilder<K, V> {
+
+        private String processorNodeName;
+        private ProcessorSupplier<K, V> processorSupplier;
+        private Serde<K> keySerde;
+        private Serde<V> valueSerde;
+        private String sinkName;
+        private String sourceName;
+        private String repartitionTopic;
+        private String processorName;
+        private String parentProcessorNodeName;
+
+        private RepartitionNodeBuilder() {
+        }
+
+        RepartitionNodeBuilder<K, V> withProcessorSupplier(final ProcessorSupplier<K, V> processorSupplier) {
+            this.processorSupplier = processorSupplier;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withKeySerde(final Serde<K> keySerde) {
+            this.keySerde = keySerde;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withValueSerde(final Serde<V> valueSerde) {
+            this.valueSerde = valueSerde;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withSinkName(final String sinkName) {
+            this.sinkName = sinkName;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withSourceName(final String sourceName) {
+            this.sourceName = sourceName;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withRepartitionTopic(final String repartitionTopic) {
+            this.repartitionTopic = repartitionTopic;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withProcessorName(final String processorName) {
+            this.processorName = processorName;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withParentProcessorNodeName(final String parentProcessorNodeName) {
+            this.parentProcessorNodeName = parentProcessorNodeName;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withProcessorNodeName(final String processorNodeName) {
+            this.processorNodeName = processorNodeName;
+            return this;
+        }
+
+        RepartitionNode<K, V> build() {
+
+            return new RepartitionNode<>(parentProcessorNodeName,
+                                         processorNodeName,
+                                         sourceName,
+                                         processorSupplier,
+                                         keySerde,
+                                         valueSerde,
+                                         sinkName,
+                                         repartitionTopic,
+                                         processorName);
+
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulProcessorNode.java
new file mode 100644
index 0000000..57d30fe
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulProcessorNode.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Arrays;
+
+class StatefulProcessorNode<K, V> extends StatelessProcessorNode<K, V> {
+
+    private final String[] storeNames;
+    private final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier;
+    private final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+
+
+    StatefulProcessorNode(final String parentNodeName,
+                          final String processorNodeName,
+                          final ProcessorSupplier<K, V> processorSupplier,
+                          final String[] storeNames,
+                          final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
+                          final StoreBuilder<KeyValueStore<K, V>> storeBuilder,
+                          final boolean repartitionRequired) {
+        super(parentNodeName,
+              processorNodeName,
+              processorSupplier,
+              repartitionRequired);
+
+        this.storeNames = storeNames;
+        this.storeSupplier = storeSupplier;
+        this.storeBuilder = storeBuilder;
+    }
+
+
+    String[] storeNames() {
+        return Arrays.copyOf(storeNames, storeNames.length);
+    }
+
+    org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier() {
+        return storeSupplier;
+    }
+
+    StoreBuilder<KeyValueStore<K, V>> storeBuilder() {
+        return storeBuilder;
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    static <K, V> StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder() {
+        return new StatefulProcessorNodeBuilder<>();
+    }
+
+    static final class StatefulProcessorNodeBuilder<K, V> {
+
+        private ProcessorSupplier processorSupplier;
+        private String processorNodeName;
+        private String parentProcessorNodeName;
+        private boolean repartitionRequired;
+        private String[] storeNames;
+        private org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier;
+        private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+
+        private StatefulProcessorNodeBuilder() {
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withProcessorSupplier(final ProcessorSupplier processorSupplier) {
+            this.processorSupplier = processorSupplier;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withProcessorNodeName(final String processorNodeName) {
+            this.processorNodeName = processorNodeName;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withParentProcessorNodeName(final String parentProcessorNodeName) {
+            this.parentProcessorNodeName = parentProcessorNodeName;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withStoreNames(final String[] storeNames) {
+            this.storeNames = storeNames;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withRepartitionRequired(final boolean repartitionRequired) {
+            this.repartitionRequired = repartitionRequired;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withStoreSupplier(final StateStoreSupplier<KeyValueStore> storeSupplier) {
+            this.storeSupplier = storeSupplier;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+            this.storeBuilder = storeBuilder;
+            return this;
+        }
+
+        StatefulProcessorNode<K, V> build() {
+            return new StatefulProcessorNode<>(parentProcessorNodeName,
+                                               processorNodeName,
+                                               processorSupplier,
+                                               storeNames,
+                                               storeSupplier,
+                                               storeBuilder,
+                                               repartitionRequired);
+
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulRepartitionNode.java
new file mode 100644
index 0000000..e7d5140
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulRepartitionNode.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+class StatefulRepartitionNode<K, V, T> extends RepartitionNode<K, V> {
+
+    private final ProcessorSupplier<K, Change<V>> statefulProcessorSupplier;
+    private final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized;
+
+    StatefulRepartitionNode(final String parentProcessorNodeName,
+                            final String processorNodeName,
+                            final String sourceName,
+                            final Serde<K> keySerde,
+                            final Serde<V> valueSerde,
+                            final String sinkName,
+                            final String repartitionTopic,
+                            final String processorName,
+                            final ProcessorSupplier<K, Change<V>> statefulProcessorSupplier,
+                            final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
+        super(parentProcessorNodeName,
+              processorNodeName,
+              sourceName,
+              null,
+              keySerde,
+              valueSerde,
+              sinkName,
+              repartitionTopic,
+              processorName);
+
+        this.statefulProcessorSupplier = statefulProcessorSupplier;
+        this.materialized = materialized;
+    }
+
+    ProcessorSupplier<K, Change<V>> statefulProcessorSupplier() {
+        return statefulProcessorSupplier;
+    }
+
+    MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized() {
+        return materialized;
+    }
+
+    ChangedSerializer<? extends V> changedValueSerializer() {
+        final Serializer<? extends V> valueSerializer = valueSerde() == null ? null : valueSerde().serializer();
+        return new ChangedSerializer<>(valueSerializer);
+
+    }
+
+    ChangedDeserializer<? extends V> changedValueDeserializer() {
+        final Deserializer<? extends V> valueDeserializer = valueSerde() == null ? null : valueSerde().deserializer();
+        return new ChangedDeserializer<>(valueDeserializer);
+    }
+
+    static <K, V, T> StatefulRepartitionNodeBuilder<K, V, T> statefulRepartitionNodeBuilder() {
+        return new StatefulRepartitionNodeBuilder<>();
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+
+    static final class StatefulRepartitionNodeBuilder<K, V, T> {
+
+        private String parentProcessorNodeName;
+        private String processorNodeName;
+        private Serde<K> keySerde;
+        private Serde<V> valueSerde;
+        private String sinkName;
+        private String sourceName;
+        private String repartitionTopic;
+        private String processorName;
+        private ProcessorSupplier<K, Change<V>> statefulProcessorSupplier;
+        private MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized;
+
+        private StatefulRepartitionNodeBuilder() {
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withKeySerde(final Serde<K> keySerde) {
+            this.keySerde = keySerde;
+            return this;
+        }
+
+
+        StatefulRepartitionNodeBuilder<K, V, T> withValueSerde(final Serde<V> valueSerde) {
+            this.valueSerde = valueSerde;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withParentProcessorNodeName(final String parentProcessorNodeName) {
+            this.parentProcessorNodeName = parentProcessorNodeName;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withSinkName(final String sinkName) {
+            this.sinkName = sinkName;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withSourceName(final String sourceName) {
+            this.sourceName = sourceName;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withRepartitionTopic(final String repartitionTopic) {
+            this.repartitionTopic = repartitionTopic;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withProcessorNodeName(final String processorNodeName) {
+            this.processorName = processorNodeName;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withStatefulProcessorSupplier(final ProcessorSupplier<K, Change<V>> statefulProcessorSupplier) {
+            this.statefulProcessorSupplier = statefulProcessorSupplier;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withMaterialized(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
+            this.materialized = materialized;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withNodeName(final String nodeName) {
+            this.processorNodeName = nodeName;
+            return this;
+        }
+
+        public StatefulRepartitionNode<K, V, T> build() {
+
+            return new StatefulRepartitionNode<>(parentProcessorNodeName,
+                                                 processorNodeName,
+                                                 sourceName,
+                                                 keySerde,
+                                                 valueSerde,
+                                                 sinkName,
+                                                 repartitionTopic,
+                                                 processorName,
+                                                 statefulProcessorSupplier,
+                                                 materialized);
+
+
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulSourceNode.java
new file mode 100644
index 0000000..b2fdc81
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulSourceNode.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Collections;
+
+/**
+ * Used to represent either a KTable source or a GlobalKTable source.
+ * The presence of a {@link KTableSource} indicates this source node supplies
+ * a {@link org.apache.kafka.streams.kstream.GlobalKTable}
+ */
+class StatefulSourceNode<K, V> extends StreamSourceNode<K, V> {
+
+    private org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier;
+    private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+    private final ProcessorSupplier<K, V> processorSupplier;
+    private final String sourceName;
+    private final String processorName;
+    private final KTableSource<K, V> kTableSource;
+
+    StatefulSourceNode(final String predecessorNodeName,
+                       final String nodeName,
+                       final String sourceName,
+                       final String processorName,
+                       final String topic,
+                       final ConsumedInternal<K, V> consumedInternal,
+                       final ProcessorSupplier<K, V> processorSupplier,
+                       final KTableSource<K, V> kTableSource) {
+
+        super(predecessorNodeName,
+              nodeName,
+              Collections.singletonList(topic),
+              consumedInternal);
+
+        this.processorSupplier = processorSupplier;
+        this.sourceName = sourceName;
+        this.processorName = processorName;
+        this.kTableSource = kTableSource;
+    }
+
+    StateStoreSupplier<KeyValueStore> storeSupplier() {
+        return storeSupplier;
+    }
+
+    void setStoreSupplier(StateStoreSupplier<KeyValueStore> storeSupplier) {
+        this.storeSupplier = storeSupplier;
+    }
+
+    StoreBuilder<KeyValueStore<K, V>> storeBuilder() {
+        return storeBuilder;
+    }
+
+    void setStoreBuilder(StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+        this.storeBuilder = storeBuilder;
+    }
+
+    ProcessorSupplier<K, V> processorSupplier() {
+        return processorSupplier;
+    }
+
+    String sourceName() {
+        return sourceName;
+    }
+
+    KTableSource<K, V> kTableSource() {
+        return kTableSource;
+    }
+
+    String processorName() {
+        return processorName;
+    }
+
+    boolean isGlobalKTable() {
+        return kTableSource != null;
+    }
+
+    static <K, V> StatefulSourceNodeBuilder<K, V> statefulSourceNodeBuilder() {
+        return new StatefulSourceNodeBuilder<>();
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    static final class StatefulSourceNodeBuilder<K, V> {
+
+        private String predecessorNodeName;
+        private String nodeName;
+        private String sourceName;
+        private String processorName;
+        private String topic;
+        private ConsumedInternal<K, V> consumedInternal;
+        private StateStoreSupplier<KeyValueStore> storeSupplier;
+        private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+        private ProcessorSupplier<K, V> processorSupplier;
+        private KTableSource<K, V> kTableSource;
+
+        private StatefulSourceNodeBuilder() {
+        }
+
+
+        StatefulSourceNodeBuilder<K, V> withPredecessorNodeName(final String predecessorNodeName) {
+            this.predecessorNodeName = predecessorNodeName;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withSourceName(final String sourceName) {
+            this.sourceName = sourceName;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withProcessorName(final String processorName) {
+            this.processorName = processorName;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withTopic(final String topic) {
+            this.topic = topic;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withStoreSupplier(final StateStoreSupplier<KeyValueStore> storeSupplier) {
+            this.storeSupplier = storeSupplier;
+            return this;
+        }
+
+
+        StatefulSourceNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+            this.storeBuilder = storeBuilder;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withConsumedInternal(final ConsumedInternal<K, V> consumedInternal) {
+            this.consumedInternal = consumedInternal;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withProcessorSupplier(final ProcessorSupplier<K, V> processorSupplier) {
+            this.processorSupplier = processorSupplier;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withKTableSource(final KTableSource<K, V> kTableSource) {
+            this.kTableSource = kTableSource;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withNodeName(final String nodeName) {
+            this.nodeName = nodeName;
+            return this;
+        }
+
+        StatefulSourceNode<K, V> build() {
+            StatefulSourceNode<K, V>
+                statefulSourceNode =
+                new StatefulSourceNode<>(predecessorNodeName,
+                                         nodeName,
+                                         sourceName,
+                                         processorName,
+                                         topic,
+                                         consumedInternal,
+                                         processorSupplier,
+                                         kTableSource);
+
+            statefulSourceNode.setRepartitionRequired(false);
+            if (storeSupplier != null) {
+                statefulSourceNode.setStoreSupplier(storeSupplier);
+            } else if (storeBuilder != null) {
+                statefulSourceNode.setStoreBuilder(storeBuilder);
+            }
+
+            return statefulSourceNode;
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatelessProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatelessProcessorNode.java
new file mode 100644
index 0000000..eecb068
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatelessProcessorNode.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Used to represent any type of stateless operation:
+ *
+ * map, mapValues, flatMap, flatMapValues, filter, filterNot, branch
+ *
+ */
+class StatelessProcessorNode<K, V> extends StreamsGraphNode {
+
+    private final ProcessorSupplier<K, V> processorSupplier;
+
+    // some processors need to register multiple parent names with
+    // the InternalTopologyBuilder KStream#merge for example.
+    // There is only one parent graph node but the name of each KStream merged needs
+    // to get registered with InternalStreamsBuilder
+
+    private List<String> multipleParentNames = new ArrayList<>();
+
+
+    StatelessProcessorNode(final String parentProcessorNodeName,
+                           final String processorNodeName,
+                           final ProcessorSupplier<K, V> processorSupplier,
+                           final boolean repartitionRequired) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              repartitionRequired);
+
+        this.processorSupplier = processorSupplier;
+    }
+
+    StatelessProcessorNode(final String parentProcessorNodeName,
+                           final String processorNodeName,
+                           final boolean repartitionRequired,
+                           final ProcessorSupplier<K, V> processorSupplier,
+                           final List<String> multipleParentNames) {
+
+        this(parentProcessorNodeName, processorNodeName, processorSupplier, repartitionRequired);
+
+        this.multipleParentNames = multipleParentNames;
+    }
+
+    ProcessorSupplier<K, V> processorSupplier() {
+        return processorSupplier;
+    }
+
+    List<String> multipleParentNames() {
+        return new ArrayList<>(multipleParentNames);
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSinkNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSinkNode.java
new file mode 100644
index 0000000..0a65d1b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSinkNode.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+class StreamSinkNode<K, V> extends StreamsGraphNode {
+
+    private final String topic;
+    private final ProducedInternal<K, V> producedInternal;
+
+    StreamSinkNode(final String parentProcessorNodeName,
+                   final String processorNodeName,
+                   final String topic,
+                   final ProducedInternal<K, V> producedInternal) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              false);
+
+        this.topic = topic;
+        this.producedInternal = producedInternal;
+    }
+
+    String topic() {
+        return topic;
+    }
+
+    Serde<K> keySerde() {
+        return producedInternal.keySerde();
+    }
+
+    Serializer<K> keySerializer() {
+        return producedInternal.keySerde() != null ? producedInternal.keySerde().serializer() : null;
+    }
+
+    Serde<V> valueSerde() {
+        return producedInternal.valueSerde();
+    }
+
+    Serializer<V> valueSerializer() {
+        return producedInternal.valueSerde() != null ? producedInternal.valueSerde().serializer() : null;
+    }
+
+    StreamPartitioner<? super K, ? super V> streamPartitioner() {
+        return producedInternal.streamPartitioner();
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSourceNode.java
new file mode 100644
index 0000000..cb86840
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSourceNode.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Pattern;
+
+class StreamSourceNode<K, V> extends StreamsGraphNode {
+
+    private Collection<String> topics;
+    private Pattern topicPattern;
+    private final ConsumedInternal<K, V> consumedInternal;
+
+
+    StreamSourceNode(final String parentProcessorNodeName,
+                     final String processorNodeName,
+                     final Collection<String> topics,
+                     final ConsumedInternal<K, V> consumedInternal) {
+        super(parentProcessorNodeName,
+              processorNodeName,
+              false);
+
+        this.topics = topics;
+        this.consumedInternal = consumedInternal;
+    }
+
+    StreamSourceNode(final String parentProcessorNodeName,
+                     final String processorNodeName,
+                     final Pattern topicPattern,
+                     final ConsumedInternal<K, V> consumedInternal) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              false);
+
+        this.topicPattern = topicPattern;
+        this.consumedInternal = consumedInternal;
+    }
+
+    List<String> getTopics() {
+        return new ArrayList<>(topics);
+    }
+
+    Pattern getTopicPattern() {
+        return topicPattern;
+    }
+
+    Serde<K> keySerde() {
+        return consumedInternal.keySerde();
+    }
+
+    Deserializer<K> keyDeserializer() {
+        return consumedInternal.keySerde() != null ? consumedInternal.keySerde().deserializer() : null;
+    }
+
+    TimestampExtractor timestampExtractor() {
+        return consumedInternal.timestampExtractor();
+    }
+
+    Topology.AutoOffsetReset autoOffsetReset() {
+        return consumedInternal.offsetResetPolicy();
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinNode.java
new file mode 100644
index 0000000..90734eb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinNode.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.WindowStore;
+
+/**
+ * Too much information to generalize, so Stream-Stream joins are
+ * represented by a specific node.
+ */
+class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
+
+    private final ProcessorSupplier<K, V1> thisWindowedStreamProcessorSupplier;
+    private final ProcessorSupplier<K, V2> otherWindowedStreamProcessorSupplier;
+    private final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder;
+    private final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder;
+    private final Joined<K, V1, V2> joined;
+
+    private final String thisWindowedStreamName;
+    private final String otherWindowedStreamName;
+
+
+    StreamStreamJoinNode(final String parentProcessorNodeName,
+                         final String processorNodeName,
+                         final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
+                         final ProcessorParameters<K, V1> joinThisProcessorParameters,
+                         final ProcessorParameters<K, V2> joinOtherProcessParameters,
+                         final ProcessorParameters<K, VR> joinMergeProcessorParameters,
+                         final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters,
+                         final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters,
+                         final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder,
+                         final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder,
+                         final Joined<K, V1, V2> joined,
+                         final String leftHandSideStreamName,
+                         final String otherStreamName) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              valueJoiner,
+              joinThisProcessorParameters,
+              joinOtherProcessParameters,
+              joinMergeProcessorParameters,
+              leftHandSideStreamName,
+              otherStreamName);
+
+        this.thisWindowedStreamProcessorSupplier = thisWindowedStreamProcessorParameters.processorSupplier();
+        this.otherWindowedStreamProcessorSupplier = otherWindowedStreamProcessorParameters.processorSupplier();
+        this.thisWindowedStreamName = thisWindowedStreamProcessorParameters.processorName();
+        this.otherWindowedStreamName = otherWindowedStreamProcessorParameters.processorName();
+        this.thisWindowStoreBuilder = thisWindowStoreBuilder;
+        this.otherWindowStoreBuilder = otherWindowStoreBuilder;
+        this.joined = joined;
+    }
+
+    ProcessorSupplier<K, V1> thisWindowedStreamProcessorSupplier() {
+        return thisWindowedStreamProcessorSupplier;
+    }
+
+    ProcessorSupplier<K, V2> otherWindowedStreamProcessorSupplier() {
+        return otherWindowedStreamProcessorSupplier;
+    }
+
+    String thisWindowedStreamName() {
+        return thisWindowedStreamName;
+    }
+
+    String otherWindowedStreamName() {
+        return otherWindowedStreamName;
+    }
+
+    StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder() {
+        return thisWindowStoreBuilder;
+    }
+
+    StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder() {
+        return otherWindowStoreBuilder;
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    static <K, V, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> streamStreamJoinNodeBuilder() {
+        return new StreamStreamJoinNodeBuilder<>();
+    }
+
+    static final class StreamStreamJoinNodeBuilder<K, V1, V2, VR> {
+
+        private String processorNodeName;
+        private String parentProcessorNodeName;
+        private ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner;
+        private ProcessorParameters<K, V1> joinThisProcessorParameters;
+        private ProcessorParameters<K, V2> joinOtherProcessorParameters;
+        private ProcessorParameters<K, VR> joinMergeProcessorParameters;
+        private ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters;
+        private ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters;
+        private StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder;
+        private StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder;
+        private Joined<K, V1, V2> joined;
+        private String leftHandSideStreamName;
+        private String otherStreamName;
+
+
+        private StreamStreamJoinNodeBuilder() {
+        }
+
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withValueJoiner(final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner) {
+            this.valueJoiner = valueJoiner;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoinThisProcessorParameters(final ProcessorParameters<K, V1> joinThisProcessorParameters) {
+            this.joinThisProcessorParameters = joinThisProcessorParameters;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withThisWindowedStreamProcessorParameters(final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters) {
+            this.thisWindowedStreamProcessorParameters = thisWindowedStreamProcessorParameters;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withProcessorNodeName(final String name) {
+            this.processorNodeName = name;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withParentProcessorNodeName(final String predecessorNodeName) {
+            this.parentProcessorNodeName = predecessorNodeName;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoinOtherProcessorParameters(final ProcessorParameters<K, V2> joinOtherProcessParameters) {
+            this.joinOtherProcessorParameters = joinOtherProcessParameters;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherWindowedStreamProcessorParameters(final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters) {
+            this.otherWindowedStreamProcessorParameters = otherWindowedStreamProcessorParameters;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> joinMergeProcessorParameters) {
+            this.joinMergeProcessorParameters = joinMergeProcessorParameters;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withLeftHandSideStreamName(final String leftHandSideStreamName) {
+            this.leftHandSideStreamName = leftHandSideStreamName;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherStreamName(final String otherStreamName) {
+            this.otherStreamName = otherStreamName;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withThisWindowStoreBuilder(final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder) {
+            this.thisWindowStoreBuilder = thisWindowStoreBuilder;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherWindowStoreBuilder(final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder) {
+            this.otherWindowStoreBuilder = otherWindowStoreBuilder;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoined(final Joined<K, V1, V2> joined) {
+            this.joined = joined;
+            return this;
+        }
+
+        StreamStreamJoinNode<K, V1, V2, VR> build() {
+
+            return new StreamStreamJoinNode<>(parentProcessorNodeName,
+                                              processorNodeName,
+                                              valueJoiner,
+                                              joinThisProcessorParameters,
+                                              joinOtherProcessorParameters,
+                                              joinMergeProcessorParameters,
+                                              thisWindowedStreamProcessorParameters,
+                                              otherWindowedStreamProcessorParameters,
+                                              thisWindowStoreBuilder,
+                                              otherWindowStoreBuilder,
+                                              joined,
+                                              leftHandSideStreamName,
+                                              otherStreamName);
+
+
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamTableJoinNode.java
new file mode 100644
index 0000000..18f2055
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamTableJoinNode.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.Arrays;
+
+/**
+ * Represents a join between a KStream and a KTable or GlobalKTable
+ */
+
+class StreamTableJoinNode<K1, K2, V1, V2, VR> extends StreamsGraphNode {
+
+    private final String[] storeNames;
+    private final ProcessorSupplier<K1, V1> processorSupplier;
+
+    StreamTableJoinNode(final String parentProcessorNodeName,
+                        final String processorNodeName,
+                        final ProcessorSupplier<K1, V1> processorSupplier,
+                        final String[] storeNames) {
+        super(parentProcessorNodeName,
+              processorNodeName,
+              false);
+
+        // in the case of Stream-Table join the state stores associated with the KTable
+        this.storeNames = storeNames;
+        this.processorSupplier = processorSupplier;
+    }
+
+    String[] storeNames() {
+        return Arrays.copyOf(storeNames, storeNames.length);
+    }
+
+    ProcessorSupplier<K1, V1> processorSupplier() {
+        return processorSupplier;
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsGraphNode.java
new file mode 100644
index 0000000..4597513
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsGraphNode.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.Collection;
+import java.util.LinkedHashSet;
+
+abstract class StreamsGraphNode {
+
+    private StreamsGraphNode parentNode;
+    private final Collection<StreamsGraphNode> childNodes = new LinkedHashSet<>();
+    private final String processorNodeName;
+    private String parentProcessorNodeName;
+    private boolean repartitionRequired;
+    private boolean triggersRepartitioning;
+    private Integer id;
+    private StreamsTopologyGraph streamsTopologyGraph;
+
+    StreamsGraphNode(final String parentProcessorNodeName,
+                     final String processorNodeName,
+                     final boolean repartitionRequired) {
+        this.parentProcessorNodeName = parentProcessorNodeName;
+        this.processorNodeName = processorNodeName;
+        this.repartitionRequired = repartitionRequired;
+    }
+
+    StreamsGraphNode parentNode() {
+        return parentNode;
+    }
+
+    String parentProcessorNodeName() {
+        return parentProcessorNodeName;
+    }
+
+    void setParentProcessorNodeName(final String parentProcessorNodeName) {
+        this.parentProcessorNodeName = parentProcessorNodeName;
+    }
+
+    void setParentNode(final StreamsGraphNode parentNode) {
+        this.parentNode = parentNode;
+    }
+
+    Collection<StreamsGraphNode> children() {
+        return new LinkedHashSet<>(childNodes);
+    }
+
+    void addChildNode(final StreamsGraphNode node) {
+        this.childNodes.add(node);
+    }
+
+    String processorNodeName() {
+        return processorNodeName;
+    }
+
+    boolean repartitionRequired() {
+        return repartitionRequired;
+    }
+
+    void setRepartitionRequired(boolean repartitionRequired) {
+        this.repartitionRequired = repartitionRequired;
+    }
+
+    public boolean triggersRepartitioning() {
+        return triggersRepartitioning;
+    }
+
+    public void setTriggersRepartitioning(final boolean triggersRepartitioning) {
+        this.triggersRepartitioning = triggersRepartitioning;
+    }
+
+    void setId(final int id) {
+        this.id = id;
+    }
+
+    Integer id() {
+        return this.id;
+    }
+
+    public void setStreamsTopologyGraph(final StreamsTopologyGraph streamsTopologyGraph) {
+        this.streamsTopologyGraph = streamsTopologyGraph;
+    }
+
+    StreamsTopologyGraph streamsTopologyGraph() {
+        return streamsTopologyGraph;
+    }
+
+    abstract void writeToTopology(final InternalTopologyBuilder topologyBuilder);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsTopologyGraph.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsTopologyGraph.java
new file mode 100644
index 0000000..9d2b90b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsTopologyGraph.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class StreamsTopologyGraph {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StreamsTopologyGraph.class);
+    public static final String TOPOLOGY_ROOT = "root";
+
+    protected final StreamsGraphNode root = new StreamsGraphNode(null, TOPOLOGY_ROOT, false) {
+        @Override
+        void writeToTopology(InternalTopologyBuilder topologyBuilder) {
+            // no-op for root node
+        }
+    };
+
+    private final AtomicInteger nodeIdCounter = new AtomicInteger(0);
+    private final Map<StreamsGraphNode, Set<StreamsGraphNode>> repartitioningNodeToRepartitioned = new HashMap<>();
+    private final Map<StreamsGraphNode, StreamSinkNode> stateStoreNodeToSinkNodes = new HashMap<>();
+    private final Map<String, StreamsGraphNode> nameToGraphNode = new HashMap<>();
+
+    private StreamsGraphNode previousNode;
+
+    StreamsTopologyGraph() {
+        nameToGraphNode.put(TOPOLOGY_ROOT, root);
+    }
+
+
+    public void addNode(final StreamsGraphNode node) {
+        node.setId(nodeIdCounter.getAndIncrement());
+
+        if (node.parentProcessorNodeName() == null && !node.processorNodeName().equals(TOPOLOGY_ROOT)) {
+            LOG.warn("Updating node {} with predecessor name {}", node, previousNode.processorNodeName());
+            node.setParentProcessorNodeName(previousNode.processorNodeName());
+        }
+
+        LOG.debug("Adding node {}", node);
+
+        final StreamsGraphNode predecessorNode =  nameToGraphNode.get(node.parentProcessorNodeName());
+
+        if (predecessorNode == null) {
+            throw new IllegalStateException(
+                "Nodes should not have a null predecessor.  Name: " + node.processorNodeName() + " Type: "
+                + node.getClass().getSimpleName() + " predecessor name " + node.parentProcessorNodeName());
+        }
+
+        node.setParentNode(predecessorNode);
+        predecessorNode.addChildNode(node);
+
+        if (node.triggersRepartitioning()) {
+            repartitioningNodeToRepartitioned.put(node, new HashSet<StreamsGraphNode>());
+        } else if (node.repartitionRequired()) {
+            StreamsGraphNode currentNode = node;
+            while (currentNode != null) {
+                final StreamsGraphNode parentNode = currentNode.parentNode();
+                if (parentNode.triggersRepartitioning()) {
+                    repartitioningNodeToRepartitioned.get(parentNode).add(node);
+                    break;
+                }
+                currentNode = parentNode.parentNode();
+            }
+        }
+
+        if (!nameToGraphNode.containsKey(node.processorNodeName())) {
+            nameToGraphNode.put(node.processorNodeName(), node);
+        }
+
+        previousNode = node;
+    }
+
+    public StreamsGraphNode getRoot() {
+        return root;
+    }
+
+    /**
+     * Used for hints when a node in the topology triggers a repartition and the repartition flag
+     * is propagated down through the descendant nodes of the topology.  This can be used to help make an
+     * optimization where the triggering node does an eager "through" operation and the child nodes can ignore
+     * the need to repartition.
+     *
+     * @return Map&lt;StreamGraphNode, Set&lt;StreamGraphNode&gt;&gt;
+     */
+    public Map<StreamsGraphNode, Set<StreamsGraphNode>> getRepartitioningNodeToRepartitioned() {
+        Map<StreamsGraphNode, Set<StreamsGraphNode>> copy = new HashMap<>(repartitioningNodeToRepartitioned);
+        return Collections.unmodifiableMap(copy);
+    }
+
+    /**
+     * Used for hints when an Aggregation operation is directly output to a Sink topic.
+     * This map can be used to help optimize this case and use the Sink topic as the changelog topic
+     * for the state store of the aggregation.
+     *
+     * @return Map&lt;StreamGraphNode, StreamSinkNode&gt;
+     */
+    public Map<StreamsGraphNode, StreamSinkNode> getStateStoreNodeToSinkNodes() {
+        Map<StreamsGraphNode, StreamSinkNode> copy = new HashMap<>(stateStoreNodeToSinkNodes);
+        return Collections.unmodifiableMap(copy);
+    }
+
+    /**
+     * Used for tracking the Streams generated names back to the original StreamGraphNode
+     * to enable the predecessor - descendant relationship
+     *
+     * @return Map&lt;String, SteamsGraphNode&gt;
+     */
+    public Map<String, StreamsGraphNode> getNameToGraphNode() {
+        Map<String, StreamsGraphNode> copy = new HashMap<>(nameToGraphNode);
+        return Collections.unmodifiableMap(copy);
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.