You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/02/22 01:17:44 UTC

samza git commit: SAMZA-1067: Physical execution plan for logic expressions written in fluent API

Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 8815b0392 -> ea37b7463


SAMZA-1067: Physical execution plan for logic expressions written in fluent API

This is the initial commit for integration. No tests have been done.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ea37b746
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ea37b746
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ea37b746

Branch: refs/heads/samza-fluent-api-v1
Commit: ea37b7463f83c5c176c73d0b8f4c6a6199854f40
Parents: 8815b03
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Tue Feb 21 17:16:34 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue Feb 21 17:16:34 2017 -0800

----------------------------------------------------------------------
 .../samza/processorgraph/ExecutionPlanner.java  | 192 ++++++++++++
 .../samza/processorgraph/ProcessorGraph.java    | 294 +++++++++++++++++++
 .../samza/processorgraph/ProcessorNode.java     | 103 +++++++
 .../apache/samza/processorgraph/StreamEdge.java | 108 +++++++
 .../system/RemoteExecutionEnvironment.java      |  35 ++-
 .../apache/samza/util/ConfigInheritence.java    |  60 ++++
 .../org/apache/samza/config/JobConfig.scala     |   8 +
 .../scala/org/apache/samza/job/JobRunner.scala  |  29 +-
 .../main/scala/org/apache/samza/util/Util.scala |  25 ++
 .../processorgraph/TestProcessorGraph.java      | 198 +++++++++++++
 10 files changed, 1043 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
new file mode 100644
index 0000000..a990463
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExecutionPlanner {
+  private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
+
+  private final Config config;
+
+  public ExecutionPlanner(Config config) {
+    this.config = config;
+  }
+
+  public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
+    Map<String, SystemAdmin> sysAdmins = getSystemAdmins(config);
+
+    // create physical processors based on stream graph
+    ProcessorGraph processorGraph = splitStages(streamGraph);
+
+    // figure out the partition for internal streams
+    Multimap<String, StreamSpec> streams = calculatePartitions(processorGraph, sysAdmins);
+
+    // create the streams
+    createStreams(streams, sysAdmins);
+
+    return processorGraph;
+  }
+
+  public ProcessorGraph splitStages(StreamGraph streamGraph) throws Exception {
+    String pipelineId = String.format("%s-%s", config.get(JobConfig.JOB_NAME()), config.getOrDefault(JobConfig.JOB_ID(), "1"));
+    // For this phase, we are going to create a processor with the whole dag
+    String processorId = pipelineId; // only one processor, name it the same as pipeline itself
+
+    ProcessorGraph processorGraph = new ProcessorGraph(config);
+
+    // TODO: remote the casting once we have the correct types in StreamGraph
+    Set<StreamSpec> sourceStreams = (Set) streamGraph.getInStreams().keySet();
+    Set<StreamSpec> sinkStreams = (Set) streamGraph.getOutStreams().keySet();
+    Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
+    intStreams.retainAll(sinkStreams);
+    sourceStreams.removeAll(intStreams);
+    sinkStreams.removeAll(intStreams);
+
+    // add sources
+    sourceStreams.forEach(spec -> processorGraph.addSource(spec, processorId));
+
+    // add sinks
+    sinkStreams.forEach(spec -> processorGraph.addSink(spec, processorId));
+
+    // add intermediate streams
+    intStreams.forEach(spec -> processorGraph.addEdge(spec, processorId, processorId));
+
+    processorGraph.validate();
+
+    return processorGraph;
+  }
+
+  private Multimap<String, StreamSpec> calculatePartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+    // fetch the external streams partition info
+    getExternalStreamPartitions(processorGraph, sysAdmins);
+
+    // TODO this algorithm assumes only one processor, and it does not consider join
+    Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create();
+    List<ProcessorNode> processors = processorGraph.topologicalSort();
+    processors.forEach(processor -> {
+        Set<StreamEdge> outStreams = new HashSet<>(processor.getOutEdges());
+        outStreams.retainAll(processorGraph.getInternalStreams());
+        if (!outStreams.isEmpty()) {
+          int maxInPartition = maxPartition(processor.getInEdges());
+          int maxOutPartition = maxPartition(processor.getOutEdges());
+          int partition = Math.max(maxInPartition, maxOutPartition);
+
+          outStreams.forEach(streamEdge -> {
+              streamEdge.setPartitions(partition);
+              StreamSpec streamSpec = createStreamSpec(streamEdge);
+              streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec);
+            });
+        }
+      });
+
+    return streamsGroupedBySystem;
+  }
+
+  private void getExternalStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+    Set<StreamEdge> externalStreams = new HashSet<>();
+    externalStreams.addAll(processorGraph.getSources());
+    externalStreams.addAll(processorGraph.getSinks());
+
+    Multimap<String, StreamEdge> externalStreamsMap = HashMultimap.create();
+    externalStreams.forEach(streamEdge -> {
+        SystemStream systemStream = streamEdge.getSystemStream();
+        externalStreamsMap.put(systemStream.getSystem(), streamEdge);
+      });
+    for (Map.Entry<String, Collection<StreamEdge>> entry : externalStreamsMap.asMap().entrySet()) {
+      String systemName = entry.getKey();
+      Collection<StreamEdge> streamEdges = entry.getValue();
+      Map<String, StreamEdge> streamToEdge = new HashMap<>();
+      streamEdges.forEach(streamEdge -> streamToEdge.put(streamEdge.getSystemStream().getStream(), streamEdge));
+      SystemAdmin systemAdmin = sysAdmins.get(systemName);
+      Map<String, SystemStreamMetadata> metadata = systemAdmin.getSystemStreamMetadata(streamToEdge.keySet());
+      metadata.forEach((stream, data) -> {
+          streamToEdge.get(stream).setPartitions(data.getSystemStreamPartitionMetadata().size());
+        });
+    }
+  }
+
+  private void createStreams(Multimap<String, StreamSpec> streams, Map<String, SystemAdmin> sysAdmins) {
+    for (Map.Entry<String, Collection<StreamSpec>> entry : streams.asMap().entrySet()) {
+      String systemName = entry.getKey();
+      SystemAdmin systemAdmin = sysAdmins.get(systemName);
+
+      for (StreamSpec stream : entry.getValue()) {
+        log.info("Creating stream {} on system {}", stream.getPhysicalName(), systemName);
+        systemAdmin.createStream(stream);
+      }
+    }
+  }
+
+  private static int maxPartition(Collection<StreamEdge> edges) {
+    return edges.stream().map(StreamEdge::getPartitions).reduce(Integer::max).get();
+  }
+
+  private static StreamSpec createStreamSpec(StreamEdge edge) {
+    StreamSpec orgSpec = edge.getStreamSpec();
+    return orgSpec.copyWithPartitionCount(edge.getPartitions());
+  }
+
+  private static Map<String, SystemAdmin> getSystemAdmins(Config config) {
+    return getSystemFactories(config).entrySet()
+        .stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().getAdmin(entry.getKey(), config)));
+  }
+
+  private static Map<String, SystemFactory> getSystemFactories(Config config) {
+    Map<String, SystemFactory> systemFactories =
+        getSystemNames(config).stream().collect(Collectors.toMap(systemName -> systemName, systemName -> {
+            String systemFactoryClassName = new JavaSystemConfig(config).getSystemFactory(systemName);
+            if (systemFactoryClassName == null) {
+              throw new SamzaException(
+                  String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+            }
+            return Util.getObj(systemFactoryClassName);
+          }));
+
+    return systemFactories;
+  }
+
+  private static Collection<String> getSystemNames(Config config) {
+    return new JavaSystemConfig(config).getSystemNames();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
new file mode 100644
index 0000000..d4ad84b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ProcessorGraph represents the multi-stage Samza processors of a pipeline on the physical execution layer.
+ * High level APIs are transformed into ProcessorGraph for future plan, validation and execution.
+ *
+ * <p>The ProcessorGraph is a graph of source/sink/intermediate streams and processors are connected together. Each
+ * ProcessorNode contains the config which is required to run the processor.
+ *
+ */
+public class ProcessorGraph {
+  private static final Logger log = LoggerFactory.getLogger(ProcessorGraph.class);
+
+  private final Map<String, ProcessorNode> nodes = new HashMap<>();
+  private final Map<String, StreamEdge> edges = new HashMap<>();
+  private final Set<StreamEdge> sources = new HashSet<>();
+  private final Set<StreamEdge> sinks = new HashSet<>();
+  private final Set<StreamEdge> internalStreams = new HashSet<>();
+  private final Config config;
+
+  ProcessorGraph(Config config) {
+    this.config = config;
+  }
+
+  void addSource(StreamSpec input, String targetProcessorId) {
+    ProcessorNode node = getNode(targetProcessorId);
+    StreamEdge edge = getEdge(input);
+    edge.addTargetNode(node);
+    node.addInEdge(edge);
+    sources.add(edge);
+
+    log.info(edge.toString());
+  }
+
+  void addSink(StreamSpec output, String sourceProcessorId) {
+    ProcessorNode node = getNode(sourceProcessorId);
+    StreamEdge edge = getEdge(output);
+    edge.addSourceNode(node);
+    node.addOutEdge(edge);
+    sinks.add(edge);
+
+    log.info(edge.toString());
+  }
+
+  void addEdge(StreamSpec streamSpec, String sourceProcessorId, String targetProcessorId) {
+    ProcessorNode sourceNode = getNode(sourceProcessorId);
+    ProcessorNode targetNode = getNode(targetProcessorId);
+    StreamEdge edge = getEdge(streamSpec);
+    edge.addSourceNode(sourceNode);
+    edge.addTargetNode(targetNode);
+    sourceNode.addOutEdge(edge);
+    targetNode.addInEdge(edge);
+    internalStreams.add(edge);
+
+    log.info(edge.toString());
+  }
+
+  ProcessorNode getNode(String processorId) {
+    ProcessorNode node = nodes.get(processorId);
+    if (node == null) {
+      node = new ProcessorNode(processorId, config);
+      nodes.put(processorId, node);
+    }
+    return node;
+  }
+
+  StreamEdge getEdge(StreamSpec streamSpec) {
+    String streamId = streamSpec.getId();
+    StreamEdge edge = edges.get(streamId);
+    if (edge == null) {
+      edge = new StreamEdge(streamSpec, config);
+      edges.put(streamId, edge);
+    }
+    return edge;
+  }
+
+  /**
+   * Returns the processor with configs to be executed in the topological order
+   * @return list of ProcessorConfig
+   */
+  public List<ProcessorNode> getProcessors() {
+    List<ProcessorNode> sortedNodes = topologicalSort();
+    return Collections.unmodifiableList(sortedNodes);
+  }
+
+  public Set<StreamEdge> getSources() {
+    return Collections.unmodifiableSet(sources);
+  }
+
+  public Set<StreamEdge> getSinks() {
+    return Collections.unmodifiableSet(sinks);
+  }
+
+  public Set<StreamEdge> getInternalStreams() {
+    return Collections.unmodifiableSet(internalStreams);
+  }
+
+
+  /**
+   * Validate the graph
+   */
+  public void validate() {
+    validateSources();
+    validateSinks();
+    validateInternalStreams();
+    validateReachability();
+  }
+
+  /**
+   * Validate the sources should have indegree being 0 and outdegree greater than 0
+   */
+  private void validateSources() {
+    sources.forEach(edge -> {
+        if (!edge.getSourceNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Source stream %s should not have producers.", edge.getFormattedSystemStream()));
+        }
+        if (edge.getTargetNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Source stream %s should have consumers.", edge.getFormattedSystemStream()));
+        }
+      });
+  }
+
+  /**
+   * Validate the sinks should have outdegree being 0 and indegree greater than 0
+   */
+  private void validateSinks() {
+    sinks.forEach(edge -> {
+        if (!edge.getTargetNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Sink stream %s should not have consumers", edge.getFormattedSystemStream()));
+        }
+        if (edge.getSourceNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Sink stream %s should have producers", edge.getFormattedSystemStream()));
+        }
+      });
+  }
+
+  /**
+   * Validate the internal streams should have both indegree and outdegree greater than 0
+   */
+  private void validateInternalStreams() {
+    Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
+    internalEdges.removeAll(sources);
+    internalEdges.removeAll(sinks);
+
+    internalEdges.forEach(edge -> {
+        if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Internal stream %s should have both producers and consumers", edge.getFormattedSystemStream()));
+        }
+      });
+  }
+
+  /**
+   * Validate all nodes are reachable by sources.
+   */
+  private void validateReachability() {
+    // validate all nodes are reachable from the sources
+    final Set<ProcessorNode> reachable = findReachable();
+    if (reachable.size() != nodes.size()) {
+      Set<ProcessorNode> unreachable = new HashSet<>(nodes.values());
+      unreachable.removeAll(reachable);
+      throw new IllegalArgumentException(String.format("Processors %s cannot be reached from Sources.",
+          String.join(", ", unreachable.stream().map(ProcessorNode::getId).collect(Collectors.toList()))));
+    }
+  }
+
+  /**
+   * Find the reachable set of nodes using BFS.
+   * Package private for test.
+   * @return reachable set of nodes
+   */
+  Set<ProcessorNode> findReachable() {
+    Queue<ProcessorNode> queue = new ArrayDeque<>();
+    Set<ProcessorNode> visited = new HashSet<>();
+
+    sources.forEach(source -> {
+        List<ProcessorNode> next = source.getTargetNodes();
+        queue.addAll(next);
+        visited.addAll(next);
+      });
+
+    while (!queue.isEmpty()) {
+      ProcessorNode node = queue.poll();
+      node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
+          if (!visited.contains(target)) {
+            visited.add(target);
+            queue.offer(target);
+          }
+        });
+    }
+
+    return visited;
+  }
+
+  /**
+   * An variation of Kahn's algorithm of topological sorting.
+   * This algorithm also takes account of the simple loops in the graph
+   * Package private for test.
+   * @return topologically sorted ProcessorNode(s)
+   */
+  List<ProcessorNode> topologicalSort() {
+    Collection<ProcessorNode> pnodes = nodes.values();
+    Queue<ProcessorNode> q = new ArrayDeque<>();
+    Map<String, Long> indegree = new HashMap<>();
+    pnodes.forEach(node -> {
+        String nid = node.getId();
+        long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
+        indegree.put(nid, degree);
+
+        if (degree == 0L) {
+          q.add(node);
+        }
+      });
+
+    List<ProcessorNode> sortedNodes = new ArrayList<>();
+    Set<ProcessorNode> visited = new HashSet<>();
+    while (sortedNodes.size() < pnodes.size()) {
+      while (!q.isEmpty()) {
+        ProcessorNode node = q.poll();
+        sortedNodes.add(node);
+        node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
+            String nid = n.getId();
+            Long degree = indegree.get(nid) - 1;
+            indegree.put(nid, degree);
+            if (degree == 0L && !visited.contains(n)) {
+              q.add(n);
+            }
+            visited.add(n);
+          });
+      }
+
+      if (sortedNodes.size() < pnodes.size()) {
+        // The remaining nodes have circles
+        // use the following simple approach to break the circles
+        // start from the node that have been seen
+        visited.removeAll(sortedNodes);
+        //find out the nodes with minimal input edge
+        long min = Long.MAX_VALUE;
+        ProcessorNode minNode = null;
+        for (ProcessorNode node : visited) {
+          Long degree = indegree.get(node.getId());
+          if (degree < min) {
+            min = degree;
+            minNode = node;
+          }
+        }
+        // start from the node with minimal input edge again
+        q.add(minNode);
+      }
+    }
+
+    return sortedNodes;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
new file mode 100644
index 0000000..0b02377
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.ConfigInheritence;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ProcessorNode represents a Samza processor.
+ * It contains the input/output, and the config to run the processor.
+ */
+public class ProcessorNode {
+  private static final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
+  private static final String CONFIG_PROCESSOR_PREFIX = "processors.%s.";
+
+  private final String id;
+  private final List<StreamEdge> inEdges = new ArrayList<>();
+  private final List<StreamEdge> outEdges = new ArrayList<>();
+  private final Config config;
+
+  ProcessorNode(String id, Config config) {
+    this.id = id;
+    this.config = config;
+  }
+
+  public  String getId() {
+    return id;
+  }
+
+  void addInEdge(StreamEdge in) {
+    inEdges.add(in);
+  }
+
+  void addOutEdge(StreamEdge out) {
+    outEdges.add(out);
+  }
+
+  List<StreamEdge> getInEdges() {
+    return inEdges;
+  }
+
+  List<StreamEdge> getOutEdges() {
+    return outEdges;
+  }
+
+  public Config generateConfig() {
+    String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
+    // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
+    return Util.rewriteConfig(ConfigInheritence.extractScopedConfig(config, generateProcessorConfig(), configPrefix));
+  }
+
+  private Config generateProcessorConfig() {
+    Map<String, String> configs = new HashMap<>();
+    List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
+
+    // TODO temp logs for debugging
+    log.info("Processor {} has formatted inputs {}", id, inputs);
+
+    // TODO hack alert: hard coded string literals!
+    configs.put("task.inputs", Joiner.on(',').join(inputs));
+
+    // TODO: DISCUSS how does the processor know it's output names?
+    outEdges.forEach(edge -> {
+        if (!edge.getName().isEmpty()) {
+          configs.put(String.format("task.outputs.%s.stream", edge.getName()), edge.getFormattedSystemStream());
+        }
+      });
+
+    configs.put(JobConfig.JOB_NAME(), id);
+
+    log.info("Processor {} has generated configs {}", id, configs);
+    return new MapConfig(configs);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
new file mode 100644
index 0000000..879d705
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Util;
+
+
+/**
+ * A StreamEdge connects the source {@link ProcessorNode}s to the target {@link ProcessorNode}s with a stream.
+ * If it's a sink StreamEdge, the target ProcessorNode is empty.
+ * If it's a source StreamEdge, the source ProcessorNode is empty.
+ */
+public class StreamEdge {
+  private final StreamSpec streamSpec;
+  private final List<ProcessorNode> sourceNodes = new ArrayList<>();
+  private final List<ProcessorNode> targetNodes = new ArrayList<>();
+  private final Config config;
+
+  private String name = "";
+  private int partitions = -1;
+
+  StreamEdge(StreamSpec streamSpec, Config config) {
+    this.streamSpec = streamSpec;
+    this.config = config;
+    this.name = Util.getNameFromSystemStream(getSystemStream());
+  }
+
+  void addSourceNode(ProcessorNode sourceNode) {
+    sourceNodes.add(sourceNode);
+  }
+
+  void addTargetNode(ProcessorNode targetNode) {
+    targetNodes.add(targetNode);
+  }
+
+  StreamSpec getStreamSpec() {
+    return streamSpec;
+  }
+
+  SystemStream getSystemStream() {
+    return new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+  }
+
+  String getFormattedSystemStream() {
+    return Util.getNameFromSystemStream(getSystemStream());
+  }
+
+  List<ProcessorNode> getSourceNodes() {
+    return sourceNodes;
+  }
+
+  List<ProcessorNode> getTargetNodes() {
+    return targetNodes;
+  }
+
+  int getPartitions() {
+    return partitions;
+  }
+
+  void setPartitions(int partitions) {
+    this.partitions = partitions;
+  }
+
+  String getName() {
+    return name;
+  }
+
+  void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("StreamEdge ");
+    builder.append(getSystemStream().toString()).append(": (");
+    List<String> sourceIds = sourceNodes.stream().map(node -> node.getId()).collect(Collectors.toList());
+    String sources = Joiner.on(',').join(sourceIds);
+    builder.append(sources).append(") -> (");
+    List<String> targetIds = targetNodes.stream().map(node -> node.getId()).collect(Collectors.toList());
+    String targets = Joiner.on(',').join(targetIds);
+    builder.append(targets).append(")");
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
index fafa2cb..1dbc5f4 100644
--- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -18,20 +18,47 @@
  */
 package org.apache.samza.system;
 
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.JobRunner;
+import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.processorgraph.ExecutionPlanner;
+import org.apache.samza.processorgraph.ProcessorGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
  */
 public class RemoteExecutionEnvironment implements ExecutionEnvironment {
+  private static final Logger log = LoggerFactory.getLogger(RemoteExecutionEnvironment.class);
 
   @Override public void run(StreamGraphBuilder app, Config config) {
     // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
     // TODO: actually instantiate the tasks and run the job, i.e.
-    // 1. create all input/output/intermediate topics
-    // 2. create the single job configuration
-    // 3. execute JobRunner to submit the single job for the whole graph
-  }
+    try {
+      // 1. build stream graph
+      StreamGraph streamGraph = new StreamGraphImpl();
+      app.init(streamGraph, config);
+
+      // 2. create the physical execution plan
+      ExecutionPlanner planner = new ExecutionPlanner(config);
+      ProcessorGraph processorGraph = planner.plan(streamGraph);
 
+      // 3. submit jobs for remote execution
+      processorGraph.getProcessors().forEach(processor -> {
+          Config processorConfig = processor.generateConfig();
+          String processorId = processor.getId();
+          log.info("Starting processor {} with config {}", processorId, config);
+
+          JobRunner runner = new JobRunner(processorConfig);
+          runner.run(true);
+        });
+    } catch (Exception e) {
+      throw new SamzaException("fail to run graph", e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
new file mode 100644
index 0000000..2eba59b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigInheritence {
+  private static final Logger log = LoggerFactory.getLogger(ConfigInheritence.class);
+  private static final boolean INHERIT_ROOT_CONFIGS = true;
+
+  public static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
+    Config scopedConfig = fullConfig.subset(configPrefix);
+    log.info("Prefix '{}' has extracted config {}", configPrefix, scopedConfig);
+    log.info("Prefix '{}' has generated config {}", configPrefix, generatedConfig);
+
+    Config[] configPrecedence;
+    if (INHERIT_ROOT_CONFIGS) {
+      configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig};
+    } else {
+      configPrecedence = new Config[] {generatedConfig, scopedConfig};
+    }
+
+    // Strip empty configs so they don't override the configs before them.
+    Map<String, String> mergedConfig = new HashMap<>();
+    for (Map<String, String> config : configPrecedence) {
+      for (Map.Entry<String, String> property : config.entrySet()) {
+        String value = property.getValue();
+        if (!(value == null || value.isEmpty())) {
+          mergedConfig.put(property.getKey(), property.getValue());
+        }
+      }
+    }
+    scopedConfig = new MapConfig(mergedConfig);
+    log.info("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
+
+    return scopedConfig;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index b64e406..a797ac2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -78,6 +78,10 @@ object JobConfig {
   // Processor Config Constants
   val PROCESSOR_ID = "processor.id"
 
+  val EXECUTION_ENV = "job.execution.env"
+
+  val STREAM_GRAPH_BUILDER = "job.stream.graph.builder"
+
   implicit def Config2Job(config: Config) = new JobConfig(config)
 
   /**
@@ -181,4 +185,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     case Some(mode) => mode.toBoolean
     case _ => false
   }
+
+  def getExecutionEnv = getOrElse(JobConfig.EXECUTION_ENV, "")
+
+  def getStreamGraphBuilder = getOrElse(JobConfig.STREAM_GRAPH_BUILDER, "")
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 022b480..a34cedb 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,17 +20,22 @@
 package org.apache.samza.job
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{ConfigRewriter, Config}
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
+import org.apache.samza.config.Config
+import org.apache.samza.config.ConfigRewriter
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.messages.Delete
+import org.apache.samza.coordinator.stream.messages.SetConfig
 import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.operators.StreamGraphBuilder
+import org.apache.samza.system.ExecutionEnvironment
 import org.apache.samza.util.ClassLoaderHelper
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
+
 import scala.collection.JavaConversions._
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 
 
 object JobRunner extends Logging {
@@ -63,7 +68,21 @@ object JobRunner extends Logging {
     val cmdline = new CommandLine
     val options = cmdline.parser.parse(args: _*)
     val config = cmdline.loadConfig(options)
-    new JobRunner(rewriteConfig(config)).run()
+
+    // start execution env if it's defined
+    val envClass: String = config.getExecutionEnv
+    if (!envClass.isEmpty) {
+      val env: ExecutionEnvironment = ClassLoaderHelper.fromClassName(envClass)
+      val streamGraphBuilderClass: String = config.getStreamGraphBuilder
+      if (!streamGraphBuilderClass.isEmpty) {
+        val streamGraphBuilder: StreamGraphBuilder = ClassLoaderHelper.fromClassName(streamGraphBuilderClass)
+        env.run(streamGraphBuilder, config)
+      } else {
+        throw new SamzaException("No stream graph builder defined")
+      }
+    } else {
+      new JobRunner(rewriteConfig(config)).run()
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 9019d02..97bd22a 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -23,6 +23,7 @@ import java.net._
 import java.io._
 import java.lang.management.ManagementFactory
 import java.util.zip.CRC32
+import org.apache.samza.config.ConfigRewriter
 import org.apache.samza.{SamzaException, Partition}
 import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
 import java.util.Random
@@ -395,4 +396,28 @@ object Util extends Logging {
    * @return Scala clock function
    */
   implicit def asScalaClock(c: HighResolutionClock): () => Long = () => c.nanoTime()
+
+  /**
+   * Re-writes configuration using a ConfigRewriter, if one is defined. If
+   * there is no ConfigRewriter defined for the job, then this method is a
+   * no-op.
+   *
+   * @param config The config to re-write
+   * @return re-written config
+   */
+  def rewriteConfig(config: Config): Config = {
+    def rewrite(c: Config, rewriterName: String): Config = {
+      val klass = config
+              .getConfigRewriterClass(rewriterName)
+              .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+      val rewriter = Util.getObj[ConfigRewriter](klass)
+      info("Re-writing config with " + rewriter)
+      rewriter.rewrite(rewriterName, c)
+    }
+
+    config.getConfigRewriters match {
+      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+      case _ => config
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java b/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java
new file mode 100644
index 0000000..7aa9f41
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class TestProcessorGraph {
+
+  ProcessorGraph graph1;
+  ProcessorGraph graph2;
+  int streamSeq = 0;
+
+  private StreamSpec genStream() {
+    ++streamSeq;
+
+    return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
+  }
+
+  @Before
+  public void setup() {
+    /**
+     * graph1 is the example graph from wikipedia
+     *
+     * 5   7   3
+     * | / | / |
+     * v   v   |
+     * 11  8   |
+     * | \X   /
+     * v v \v
+     * 2 9 10
+     */
+    // init graph1
+    graph1 = new ProcessorGraph(null);
+    graph1.addSource(genStream(), "5");
+    graph1.addSource(genStream(), "7");
+    graph1.addSource(genStream(), "3");
+    graph1.addEdge(genStream(), "5", "11");
+    graph1.addEdge(genStream(), "7", "11");
+    graph1.addEdge(genStream(), "7", "8");
+    graph1.addEdge(genStream(), "3", "8");
+    graph1.addEdge(genStream(), "11", "2");
+    graph1.addEdge(genStream(), "11", "9");
+    graph1.addEdge(genStream(), "8", "9");
+    graph1.addEdge(genStream(), "11", "10");
+    graph1.addSink(genStream(), "2");
+    graph1.addSink(genStream(), "9");
+    graph1.addSink(genStream(), "10");
+
+    /**
+     * graph2 is a graph with a loop
+     * 1 -> 2 -> 3 -> 4 -> 5 -> 7
+     *      |<---6 <--|    <>
+     */
+    graph2 = new ProcessorGraph(null);
+    graph2.addSource(genStream(), "1");
+    graph2.addEdge(genStream(), "1", "2");
+    graph2.addEdge(genStream(), "2", "3");
+    graph2.addEdge(genStream(), "3", "4");
+    graph2.addEdge(genStream(), "4", "5");
+    graph2.addEdge(genStream(), "4", "6");
+    graph2.addEdge(genStream(), "6", "2");
+    graph2.addEdge(genStream(), "5", "5");
+    graph2.addEdge(genStream(), "5", "7");
+    graph2.addSink(genStream(), "7");
+  }
+
+  @Test
+  public void testAddSource() {
+    ProcessorGraph graph = new ProcessorGraph(null);
+
+    /**
+     * s1 -> 1
+     * s2 ->|
+     *
+     * s3 -> 2
+     *   |-> 3
+     */
+    StreamSpec s1 = genStream();
+    StreamSpec s2 = genStream();
+    StreamSpec s3 = genStream();
+    graph.addSource(s1, "1");
+    graph.addSource(s2, "1");
+    graph.addSource(s3, "2");
+    graph.addSource(s3, "3");
+
+    assertTrue(graph.getSources().size() == 3);
+
+    assertTrue(graph.getNode("1").getInEdges().size() == 2);
+    assertTrue(graph.getNode("2").getInEdges().size() == 1);
+    assertTrue(graph.getNode("3").getInEdges().size() == 1);
+
+    assertTrue(graph.getEdge(s1).getSourceNodes().size() == 0);
+    assertTrue(graph.getEdge(s1).getTargetNodes().size() == 1);
+    assertTrue(graph.getEdge(s2).getSourceNodes().size() == 0);
+    assertTrue(graph.getEdge(s2).getTargetNodes().size() == 1);
+    assertTrue(graph.getEdge(s3).getSourceNodes().size() == 0);
+    assertTrue(graph.getEdge(s3).getTargetNodes().size() == 2);
+  }
+
+  @Test
+  public void testAddSink() {
+    /**
+     * 1 -> s1
+     * 2 -> s2
+     * 2 -> s3
+     */
+    StreamSpec s1 = genStream();
+    StreamSpec s2 = genStream();
+    StreamSpec s3 = genStream();
+    ProcessorGraph graph = new ProcessorGraph(null);
+    graph.addSink(s1, "1");
+    graph.addSink(s2, "2");
+    graph.addSink(s3, "2");
+
+    assertTrue(graph.getSinks().size() == 3);
+    assertTrue(graph.getNode("1").getOutEdges().size() == 1);
+    assertTrue(graph.getNode("2").getOutEdges().size() == 2);
+
+    assertTrue(graph.getEdge(s1).getSourceNodes().size() == 1);
+    assertTrue(graph.getEdge(s1).getTargetNodes().size() == 0);
+    assertTrue(graph.getEdge(s2).getSourceNodes().size() == 1);
+    assertTrue(graph.getEdge(s2).getTargetNodes().size() == 0);
+    assertTrue(graph.getEdge(s3).getSourceNodes().size() == 1);
+    assertTrue(graph.getEdge(s3).getTargetNodes().size() == 0);
+  }
+
+  @Test
+  public void testReachable() {
+    Set<ProcessorNode> reachable1 = graph1.findReachable();
+    assertTrue(reachable1.size() == 8);
+
+    Set<ProcessorNode> reachable2 = graph2.findReachable();
+    assertTrue(reachable2.size() == 7);
+  }
+
+  @Test
+  public void testTopologicalSort() {
+
+    // test graph1
+    List<ProcessorNode> sortedNodes1 = graph1.topologicalSort();
+    Map<String, Integer> idxMap1 = new HashMap<>();
+    for (int i = 0; i < sortedNodes1.size(); i++) {
+      idxMap1.put(sortedNodes1.get(i).getId(), i);
+    }
+
+    assertTrue(idxMap1.size() == 8);
+    assertTrue(idxMap1.get("11") > idxMap1.get("5"));
+    assertTrue(idxMap1.get("11") > idxMap1.get("7"));
+    assertTrue(idxMap1.get("8") > idxMap1.get("7"));
+    assertTrue(idxMap1.get("8") > idxMap1.get("3"));
+    assertTrue(idxMap1.get("2") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("9") > idxMap1.get("8"));
+    assertTrue(idxMap1.get("9") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("10") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("10") > idxMap1.get("3"));
+
+    // test graph2
+    List<ProcessorNode> sortedNodes2 = graph2.topologicalSort();
+    Map<String, Integer> idxMap2 = new HashMap<>();
+    for (int i = 0; i < sortedNodes2.size(); i++) {
+      idxMap2.put(sortedNodes2.get(i).getId(), i);
+    }
+
+    assertTrue(idxMap2.size() == 7);
+    assertTrue(idxMap2.get("2") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("3") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("4") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("6") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("5") > idxMap2.get("4"));
+    assertTrue(idxMap2.get("7") > idxMap2.get("5"));
+  }
+}