You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/04/04 01:24:43 UTC

samza git commit: Samza 1186: Rename Processor to Job

Repository: samza
Updated Branches:
  refs/heads/master 311f9d12e -> b70ee983b


Samza 1186: Rename Processor to Job

Now we have the top level Samza application, and each stage is called a job, the previous introduced "processor" naming should be renamed as Job. This includes renaming PrcessorGraph to JobGraph, and ProcessorNode to JobNode, etc.

Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>

Reviewers: Jake Maes <jm...@apache.org>

Closes #109 from xinyuiscool/SAMZA-1186


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b70ee983
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b70ee983
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b70ee983

Branch: refs/heads/master
Commit: b70ee983b7af4f5d47e1f7368d5f42e08937d16f
Parents: 311f9d1
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Mon Apr 3 18:24:33 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Apr 3 18:24:33 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/runtime/ApplicationRunner.java |   1 +
 .../samza/execution/ExecutionPlanner.java       |  68 ++--
 .../org/apache/samza/execution/JobGraph.java    | 368 +++++++++++++++++++
 .../org/apache/samza/execution/JobNode.java     | 132 +++++++
 .../apache/samza/execution/ProcessorGraph.java  | 359 ------------------
 .../apache/samza/execution/ProcessorNode.java   | 116 ------
 .../org/apache/samza/execution/StreamEdge.java  |  18 +-
 .../samza/operators/spec/OperatorSpec.java      |   6 +
 .../samza/runtime/RemoteApplicationRunner.java  |  38 +-
 .../samza/execution/TestExecutionPlanner.java   |  44 +--
 .../apache/samza/execution/TestJobGraph.java    | 273 ++++++++++++++
 .../samza/execution/TestProcessorGraph.java     | 230 ------------
 12 files changed, 865 insertions(+), 788 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index d4f5b00..e4e24b4 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -98,6 +98,7 @@ public abstract class ApplicationRunner {
    * Returns {@link ApplicationStatus#Running} if any of the jobs are running.
    *
    * @param streamApp  the user-defined {@link StreamApplication} object
+   * @return the status of the application
    */
   public abstract ApplicationStatus status(StreamApplication streamApp);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index f9e44cf..47deecd 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -57,26 +57,23 @@ public class ExecutionPlanner {
     this.streamManager = streamManager;
   }
 
-  public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
-    // create physical processors based on stream graph
-    ProcessorGraph processorGraph = createProcessorGraph(streamGraph);
+  public JobGraph plan(StreamGraph streamGraph) throws Exception {
+    // create physical job graph based on stream graph
+    JobGraph jobGraph = createJobGraph(streamGraph);
 
-    if (!processorGraph.getIntermediateStreams().isEmpty()) {
+    if (!jobGraph.getIntermediateStreams().isEmpty()) {
       // figure out the partitions for internal streams
-      calculatePartitions(streamGraph, processorGraph);
+      calculatePartitions(streamGraph, jobGraph);
     }
 
-    return processorGraph;
+    return jobGraph;
   }
 
   /**
    * Create the physical graph from StreamGraph
    */
-  /* package private */ ProcessorGraph createProcessorGraph(StreamGraph streamGraph) {
-    // For this phase, we are going to create a processor for the whole dag
-    String processorId = config.get(JobConfig.JOB_NAME()); // only one processor, use the job name
-
-    ProcessorGraph processorGraph = new ProcessorGraph(config);
+  /* package private */ JobGraph createJobGraph(StreamGraph streamGraph) {
+    JobGraph jobGraph = new JobGraph(streamGraph, config);
     Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
     Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
@@ -84,46 +81,51 @@ public class ExecutionPlanner {
     sourceStreams.removeAll(intStreams);
     sinkStreams.removeAll(intStreams);
 
+    // For this phase, we have a single job node for the whole dag
+    String jobName = config.get(JobConfig.JOB_NAME());
+    String jobId = config.get(JobConfig.JOB_ID(), "1");
+    JobNode node = jobGraph.getOrCreateNode(jobName, jobId);
+
     // add sources
-    sourceStreams.forEach(spec -> processorGraph.addSource(spec, processorId));
+    sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
 
     // add sinks
-    sinkStreams.forEach(spec -> processorGraph.addSink(spec, processorId));
+    sinkStreams.forEach(spec -> jobGraph.addSink(spec, node));
 
     // add intermediate streams
-    intStreams.forEach(spec -> processorGraph.addIntermediateStream(spec, processorId, processorId));
+    intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
 
-    processorGraph.validate();
+    jobGraph.validate();
 
-    return processorGraph;
+    return jobGraph;
   }
 
   /**
    * Figure out the number of partitions of all streams
    */
-  /* package private */ void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
+  /* package private */ void calculatePartitions(StreamGraph streamGraph, JobGraph jobGraph) {
     // fetch the external streams partition info
-    updateExistingPartitions(processorGraph, streamManager);
+    updateExistingPartitions(jobGraph, streamManager);
 
     // calculate the partitions for the input streams of join operators
-    calculateJoinInputPartitions(streamGraph, processorGraph);
+    calculateJoinInputPartitions(streamGraph, jobGraph);
 
     // calculate the partitions for the rest of intermediate streams
-    calculateIntStreamPartitions(processorGraph, config);
+    calculateIntStreamPartitions(jobGraph, config);
 
     // validate all the partitions are assigned
-    validatePartitions(processorGraph);
+    validatePartitions(jobGraph);
   }
 
   /**
    * Fetch the partitions of source/sink streams and update the StreamEdges.
-   * @param processorGraph ProcessorGraph
+   * @param jobGraph {@link JobGraph}
    * @param streamManager the {@StreamManager} to interface with the streams.
    */
-  /* package private */ static void updateExistingPartitions(ProcessorGraph processorGraph, StreamManager streamManager) {
+  /* package private */ static void updateExistingPartitions(JobGraph jobGraph, StreamManager streamManager) {
     Set<StreamEdge> existingStreams = new HashSet<>();
-    existingStreams.addAll(processorGraph.getSources());
-    existingStreams.addAll(processorGraph.getSinks());
+    existingStreams.addAll(jobGraph.getSources());
+    existingStreams.addAll(jobGraph.getSinks());
 
     Multimap<String, StreamEdge> systemToStreamEdges = HashMultimap.create();
     // group the StreamEdge(s) based on the system name
@@ -150,7 +152,7 @@ public class ExecutionPlanner {
   /**
    * Calculate the partitions for the input streams of join operators
    */
-  /* package private */ static void calculateJoinInputPartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
+  /* package private */ static void calculateJoinInputPartitions(StreamGraph streamGraph, JobGraph jobGraph) {
     // mapping from a source stream to all join specs reachable from it
     Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
     // reverse mapping of the above
@@ -165,7 +167,7 @@ public class ExecutionPlanner {
     Set<OperatorSpec> visited = new HashSet<>();
 
     streamGraph.getInStreams().entrySet().forEach(entry -> {
-        StreamEdge streamEdge = processorGraph.getOrCreateEdge(entry.getKey());
+        StreamEdge streamEdge = jobGraph.getOrCreateEdge(entry.getKey());
         // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
         findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
             outputStreamToJoinSpec, joinQ, visited);
@@ -248,24 +250,24 @@ public class ExecutionPlanner {
     }
   }
 
-  private static void calculateIntStreamPartitions(ProcessorGraph processorGraph, Config config) {
+  private static void calculateIntStreamPartitions(JobGraph jobGraph, Config config) {
     int partitions = config.getInt(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), StreamEdge.PARTITIONS_UNKNOWN);
     if (partitions < 0) {
       // use the following simple algo to figure out the partitions
       // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
-      int maxInPartitions = maxPartition(processorGraph.getSources());
-      int maxOutPartitions = maxPartition(processorGraph.getSinks());
+      int maxInPartitions = maxPartition(jobGraph.getSources());
+      int maxOutPartitions = maxPartition(jobGraph.getSinks());
       partitions = Math.max(maxInPartitions, maxOutPartitions);
     }
-    for (StreamEdge edge : processorGraph.getIntermediateStreams()) {
+    for (StreamEdge edge : jobGraph.getIntermediateStreams()) {
       if (edge.getPartitionCount() <= 0) {
         edge.setPartitionCount(partitions);
       }
     }
   }
 
-  private static void validatePartitions(ProcessorGraph processorGraph) {
-    for (StreamEdge edge : processorGraph.getIntermediateStreams()) {
+  private static void validatePartitions(JobGraph jobGraph) {
+    for (StreamEdge edge : jobGraph.getIntermediateStreams()) {
       if (edge.getPartitionCount() <= 0) {
         throw new SamzaException(String.format("Failure to assign the partitions to Stream %s", edge.getFormattedSystemStream()));
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
new file mode 100644
index 0000000..5c9f037
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The JobGraph is the physical execution graph for a multi-stage Samza application.
+ * It contains the topology of jobs connected with source/sink/intermediate streams.
+ * High level APIs are transformed into JobGraph for planning, validation and execution.
+ * Source/sink streams are external streams while intermediate streams are created and managed by Samza.
+ * Note that intermediate streams are both the input and output of a JobNode in JobGraph.
+ * So the graph may have cycles and it's not a DAG.
+ */
+public class JobGraph {
+  private static final Logger log = LoggerFactory.getLogger(JobGraph.class);
+
+  private final Map<String, JobNode> nodes = new HashMap<>();
+  private final Map<String, StreamEdge> edges = new HashMap<>();
+  private final Set<StreamEdge> sources = new HashSet<>();
+  private final Set<StreamEdge> sinks = new HashSet<>();
+  private final Set<StreamEdge> intermediateStreams = new HashSet<>();
+  private final Config config;
+  private final StreamGraph streamGraph;
+
+  /**
+   * The JobGraph is only constructed by the {@link ExecutionPlanner}.
+   * @param config Config
+   */
+  /* package private */ JobGraph(StreamGraph streamGraph, Config config) {
+    this.streamGraph = streamGraph;
+    this.config = config;
+  }
+
+  /**
+   * Add a source stream to a {@link JobNode}
+   * @param input source stream
+   * @param node the job node that consumes from the source
+   */
+  /* package private */ void addSource(StreamSpec input, JobNode node) {
+    StreamEdge edge = getOrCreateEdge(input);
+    edge.addTargetNode(node);
+    node.addInEdge(edge);
+    sources.add(edge);
+  }
+
+  /**
+   * Add a sink stream to a {@link JobNode}
+   * @param output sink stream
+   * @param node the job node that outputs to the sink
+   */
+  /* package private */ void addSink(StreamSpec output, JobNode node) {
+    StreamEdge edge = getOrCreateEdge(output);
+    edge.addSourceNode(node);
+    node.addOutEdge(edge);
+    sinks.add(edge);
+  }
+
+  /**
+   * Add an intermediate stream from source to target {@link JobNode}
+   * @param streamSpec intermediate stream
+   * @param from the source node
+   * @param to the target node
+   */
+  /* package private */ void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
+    StreamEdge edge = getOrCreateEdge(streamSpec);
+    edge.addSourceNode(from);
+    edge.addTargetNode(to);
+    from.addOutEdge(edge);
+    to.addInEdge(edge);
+    intermediateStreams.add(edge);
+  }
+
+  /**
+   * Get the {@link JobNode}. Create one if it does not exist.
+   * @param jobName name of the job
+   * @param jobId id of the job
+   * @return
+   */
+  /* package private */JobNode getOrCreateNode(String jobName, String jobId) {
+    String nodeId = JobNode.createId(jobName, jobId);
+    JobNode node = nodes.get(nodeId);
+    if (node == null) {
+      node = new JobNode(jobName, jobId, config);
+      nodes.put(nodeId, node);
+    }
+    return node;
+  }
+
+  /**
+   * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
+   * @param streamSpec spec of the StreamEdge
+   * @return stream edge
+   */
+  /* package private */StreamEdge getOrCreateEdge(StreamSpec streamSpec) {
+    String streamId = streamSpec.getId();
+    StreamEdge edge = edges.get(streamId);
+    if (edge == null) {
+      edge = new StreamEdge(streamSpec);
+      edges.put(streamId, edge);
+    }
+    return edge;
+  }
+
+  /**
+   * Returns the job nodes to be executed in the topological order
+   * @return unmodifiable list of {@link JobNode}
+   */
+  public List<JobNode> getJobNodes() {
+    List<JobNode> sortedNodes = topologicalSort();
+    return Collections.unmodifiableList(sortedNodes);
+  }
+
+  /**
+   * Returns the source streams in the graph
+   * @return unmodifiable set of {@link StreamEdge}
+   */
+  public Set<StreamEdge> getSources() {
+    return Collections.unmodifiableSet(sources);
+  }
+
+  /**
+   * Return the sink streams in the graph
+   * @return unmodifiable set of {@link StreamEdge}
+   */
+  public Set<StreamEdge> getSinks() {
+    return Collections.unmodifiableSet(sinks);
+  }
+
+  /**
+   * Return the intermediate streams in the graph
+   * @return unmodifiable set of {@link StreamEdge}
+   */
+  public Set<StreamEdge> getIntermediateStreams() {
+    return Collections.unmodifiableSet(intermediateStreams);
+  }
+
+  /**
+   * Return the {@link StreamGraph}
+   * @return {@link StreamGraph}
+   */
+  public StreamGraph getStreamGraph() {
+    return this.streamGraph;
+  }
+
+
+  /**
+   * Validate the graph has the correct topology, meaning the sources are coming from external streams,
+   * sinks are going to external streams, and the nodes are connected with intermediate streams.
+   * Also validate all the nodes are reachable from the sources.
+   */
+  public void validate() {
+    validateSources();
+    validateSinks();
+    validateInternalStreams();
+    validateReachability();
+  }
+
+  /**
+   * Validate the sources should have indegree being 0 and outdegree greater than 0
+   */
+  private void validateSources() {
+    sources.forEach(edge -> {
+        if (!edge.getSourceNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Source stream %s should not have producers.", edge.getFormattedSystemStream()));
+        }
+        if (edge.getTargetNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Source stream %s should have consumers.", edge.getFormattedSystemStream()));
+        }
+      });
+  }
+
+  /**
+   * Validate the sinks should have outdegree being 0 and indegree greater than 0
+   */
+  private void validateSinks() {
+    sinks.forEach(edge -> {
+        if (!edge.getTargetNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Sink stream %s should not have consumers", edge.getFormattedSystemStream()));
+        }
+        if (edge.getSourceNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Sink stream %s should have producers", edge.getFormattedSystemStream()));
+        }
+      });
+  }
+
+  /**
+   * Validate the internal streams should have both indegree and outdegree greater than 0
+   */
+  private void validateInternalStreams() {
+    Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
+    internalEdges.removeAll(sources);
+    internalEdges.removeAll(sinks);
+
+    internalEdges.forEach(edge -> {
+        if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format("Internal stream %s should have both producers and consumers", edge.getFormattedSystemStream()));
+        }
+      });
+  }
+
+  /**
+   * Validate all nodes are reachable by sources.
+   */
+  private void validateReachability() {
+    // validate all nodes are reachable from the sources
+    final Set<JobNode> reachable = findReachable();
+    if (reachable.size() != nodes.size()) {
+      Set<JobNode> unreachable = new HashSet<>(nodes.values());
+      unreachable.removeAll(reachable);
+      throw new IllegalArgumentException(String.format("Jobs %s cannot be reached from Sources.",
+          String.join(", ", unreachable.stream().map(JobNode::getId).collect(Collectors.toList()))));
+    }
+  }
+
+  /**
+   * Find the reachable set of nodes using BFS.
+   * @return reachable set of {@link JobNode}
+   */
+  /* package private */ Set<JobNode> findReachable() {
+    Queue<JobNode> queue = new ArrayDeque<>();
+    Set<JobNode> visited = new HashSet<>();
+
+    sources.forEach(source -> {
+        List<JobNode> next = source.getTargetNodes();
+        queue.addAll(next);
+        visited.addAll(next);
+      });
+
+    while (!queue.isEmpty()) {
+      JobNode node = queue.poll();
+      node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
+          if (!visited.contains(target)) {
+            visited.add(target);
+            queue.offer(target);
+          }
+        });
+    }
+
+    return visited;
+  }
+
+  /**
+   * An variation of Kahn's algorithm of topological sorting.
+   * This algorithm also takes account of the simple loops in the graph
+   * @return topologically sorted {@link JobNode}s
+   */
+  /* package private */ List<JobNode> topologicalSort() {
+    Collection<JobNode> pnodes = nodes.values();
+    if (pnodes.size() == 1) {
+      return new ArrayList<>(pnodes);
+    }
+
+    Queue<JobNode> q = new ArrayDeque<>();
+    Map<String, Long> indegree = new HashMap<>();
+    Set<JobNode> visited = new HashSet<>();
+    pnodes.forEach(node -> {
+        String nid = node.getId();
+        //only count the degrees of intermediate streams
+        long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
+        indegree.put(nid, degree);
+
+        if (degree == 0L) {
+          // start from the nodes that has no intermediate input streams, so it only consumes from sources
+          q.add(node);
+          visited.add(node);
+        }
+      });
+
+    List<JobNode> sortedNodes = new ArrayList<>();
+    Set<JobNode> reachable = new HashSet<>();
+    while (sortedNodes.size() < pnodes.size()) {
+      // Here we use indegree-based approach to implment Kahn's algorithm for topological sort
+      // This approach will not change the graph itself during computation.
+      //
+      // The algorithm works as:
+      // 1. start with nodes with no incoming edges (in degree being 0) and inserted into the list
+      // 2. remove the edge from any node in the list to its connected nodes by changing the indegree of the connected nodes.
+      // 3. add any new nodes with ingree being 0
+      // 4. loop 1-3 until no more nodes with indegree 0
+      //
+      while (!q.isEmpty()) {
+        JobNode node = q.poll();
+        sortedNodes.add(node);
+        node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
+            String nid = n.getId();
+            Long degree = indegree.get(nid) - 1;
+            indegree.put(nid, degree);
+            if (degree == 0L && !visited.contains(n)) {
+              q.add(n);
+              visited.add(n);
+            }
+            reachable.add(n);
+          });
+      }
+
+      if (sortedNodes.size() < pnodes.size()) {
+        // The remaining nodes have cycles
+        // use the following approach to break the cycles
+        // start from the nodes that are reachable from previous traverse
+        reachable.removeAll(sortedNodes);
+        if (!reachable.isEmpty()) {
+          //find out the nodes with minimal input edge
+          long min = Long.MAX_VALUE;
+          JobNode minNode = null;
+          for (JobNode node : reachable) {
+            Long degree = indegree.get(node.getId());
+            if (degree < min) {
+              min = degree;
+              minNode = node;
+            }
+          }
+          // start from the node with minimal input edge again
+          q.add(minNode);
+          visited.add(minNode);
+        } else {
+          // all the remaining nodes should be reachable from sources
+          // start from sources again to find the next node that hasn't been visited
+          JobNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
+              .filter(node -> !visited.contains(node))
+              .findAny().get();
+          q.add(nextNode);
+          visited.add(nextNode);
+        }
+      }
+    }
+
+    return sortedNodes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
new file mode 100644
index 0000000..c47a69c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A JobNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
+ * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
+ * A JobNode contains the input/output, and the configs for physical execution.
+ */
+public class JobNode {
+  private static final Logger log = LoggerFactory.getLogger(JobNode.class);
+  private static final String CONFIG_JOB_PREFIX = "jobs.%s.";
+
+  private final String jobName;
+  private final String jobId;
+  private final String id;
+  private final List<StreamEdge> inEdges = new ArrayList<>();
+  private final List<StreamEdge> outEdges = new ArrayList<>();
+  private final Config config;
+
+  JobNode(String jobName, String jobId, Config config) {
+    this.jobName = jobName;
+    this.jobId = jobId;
+    this.id = createId(jobName, jobId);
+    this.config = config;
+  }
+
+  public  String getId() {
+    return id;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  void addInEdge(StreamEdge in) {
+    inEdges.add(in);
+  }
+
+  void addOutEdge(StreamEdge out) {
+    outEdges.add(out);
+  }
+
+  List<StreamEdge> getInEdges() {
+    return inEdges;
+  }
+
+  List<StreamEdge> getOutEdges() {
+    return outEdges;
+  }
+
+  public Config generateConfig() {
+    Map<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), jobName);
+
+    List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
+    configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+    log.info("Job {} has generated configs {}", jobName, configs);
+
+    String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
+    // TODO: Disallow user specifying job inputs/outputs. This info comes strictly from the pipeline.
+    return Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix));
+  }
+
+  /**
+   * This function extract the subset of configs from the full config, and use it to override the generated configs
+   * from the job.
+   * @param fullConfig full config
+   * @param generatedConfig config generated for the job
+   * @param configPrefix prefix to extract the subset of the config overrides
+   * @return config that merges the generated configs and overrides
+   */
+  private static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
+    Config scopedConfig = fullConfig.subset(configPrefix);
+
+    Config[] configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig};
+    // Strip empty configs so they don't override the configs before them.
+    Map<String, String> mergedConfig = new HashMap<>();
+    for (Map<String, String> config : configPrecedence) {
+      for (Map.Entry<String, String> property : config.entrySet()) {
+        String value = property.getValue();
+        if (!(value == null || value.isEmpty())) {
+          mergedConfig.put(property.getKey(), property.getValue());
+        }
+      }
+    }
+    scopedConfig = new MapConfig(mergedConfig);
+    log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
+
+    return scopedConfig;
+  }
+
+  static String createId(String jobName, String jobId) {
+    return String.format("%s-%s", jobName, jobId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
deleted file mode 100644
index 13755ae..0000000
--- a/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.execution;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.samza.config.Config;
-import org.apache.samza.system.StreamSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The ProcessorGraph is the physical execution graph for a multi-stage Samza application.
- * It contains the topology of execution processors connected with source/sink/intermediate streams.
- * High level APIs are transformed into ProcessorGraph for planning, validation and execution.
- * Source/sink streams are external streams while intermediate streams are created and managed by Samza.
- * Note that intermediate streams are both the input and output of a ProcessorNode in ProcessorGraph.
- * So the graph may have cycles and it's not a DAG.
- */
-public class ProcessorGraph {
-  private static final Logger log = LoggerFactory.getLogger(ProcessorGraph.class);
-
-  private final Map<String, ProcessorNode> nodes = new HashMap<>();
-  private final Map<String, StreamEdge> edges = new HashMap<>();
-  private final Set<StreamEdge> sources = new HashSet<>();
-  private final Set<StreamEdge> sinks = new HashSet<>();
-  private final Set<StreamEdge> intermediateStreams = new HashSet<>();
-  private final Config config;
-
-  /**
-   * The ProcessorGraph is only constructed by the {@link ExecutionPlanner}.
-   * @param config Config
-   */
-  /* package private */ ProcessorGraph(Config config) {
-    this.config = config;
-  }
-
-  /**
-   * Add a source stream to a {@link ProcessorNode}
-   * @param input source stream
-   * @param targetProcessorId id of the {@link ProcessorNode}
-   */
-  /* package private */ void addSource(StreamSpec input, String targetProcessorId) {
-    ProcessorNode node = getOrCreateProcessor(targetProcessorId);
-    StreamEdge edge = getOrCreateEdge(input);
-    edge.addTargetNode(node);
-    node.addInEdge(edge);
-    sources.add(edge);
-  }
-
-  /**
-   * Add a sink stream to a {@link ProcessorNode}
-   * @param output sink stream
-   * @param sourceProcessorId id of the {@link ProcessorNode}
-   */
-  /* package private */ void addSink(StreamSpec output, String sourceProcessorId) {
-    ProcessorNode node = getOrCreateProcessor(sourceProcessorId);
-    StreamEdge edge = getOrCreateEdge(output);
-    edge.addSourceNode(node);
-    node.addOutEdge(edge);
-    sinks.add(edge);
-  }
-
-  /**
-   * Add an intermediate stream from source to target {@link ProcessorNode}
-   * @param streamSpec intermediate stream
-   * @param sourceProcessorId id of the source {@link ProcessorNode}
-   * @param targetProcessorId id of the target {@link ProcessorNode}
-   */
-  /* package private */ void addIntermediateStream(StreamSpec streamSpec, String sourceProcessorId, String targetProcessorId) {
-    ProcessorNode sourceNode = getOrCreateProcessor(sourceProcessorId);
-    ProcessorNode targetNode = getOrCreateProcessor(targetProcessorId);
-    StreamEdge edge = getOrCreateEdge(streamSpec);
-    edge.addSourceNode(sourceNode);
-    edge.addTargetNode(targetNode);
-    sourceNode.addOutEdge(edge);
-    targetNode.addInEdge(edge);
-    intermediateStreams.add(edge);
-  }
-
-  /**
-   * Get the {@link ProcessorNode} for an id. Create one if it does not exist.
-   * @param processorId id of the processor
-   * @return processor node
-   */
-  /* package private */ProcessorNode getOrCreateProcessor(String processorId) {
-    ProcessorNode node = nodes.get(processorId);
-    if (node == null) {
-      node = new ProcessorNode(processorId, config);
-      nodes.put(processorId, node);
-    }
-    return node;
-  }
-
-  /**
-   * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
-   * @param streamSpec spec of the StreamEdge
-   * @return stream edge
-   */
-  /* package private */StreamEdge getOrCreateEdge(StreamSpec streamSpec) {
-    String streamId = streamSpec.getId();
-    StreamEdge edge = edges.get(streamId);
-    if (edge == null) {
-      edge = new StreamEdge(streamSpec);
-      edges.put(streamId, edge);
-    }
-    return edge;
-  }
-
-  /**
-   * Returns the processors to be executed in the topological order
-   * @return unmodifiable list of {@link ProcessorNode}
-   */
-  public List<ProcessorNode> getProcessorNodes() {
-    List<ProcessorNode> sortedNodes = topologicalSort();
-    return Collections.unmodifiableList(sortedNodes);
-  }
-
-  /**
-   * Returns the source streams in the graph
-   * @return unmodifiable set of {@link StreamEdge}
-   */
-  public Set<StreamEdge> getSources() {
-    return Collections.unmodifiableSet(sources);
-  }
-
-  /**
-   * Return the sink streams in the graph
-   * @return unmodifiable set of {@link StreamEdge}
-   */
-  public Set<StreamEdge> getSinks() {
-    return Collections.unmodifiableSet(sinks);
-  }
-
-  /**
-   * Return the intermediate streams in the graph
-   * @return unmodifiable set of {@link StreamEdge}
-   */
-  public Set<StreamEdge> getIntermediateStreams() {
-    return Collections.unmodifiableSet(intermediateStreams);
-  }
-
-
-  /**
-   * Validate the graph has the correct topology, meaning the sources are coming from external streams,
-   * sinks are going to external streams, and the nodes are connected with intermediate streams.
-   * Also validate all the nodes are reachable from the sources.
-   */
-  public void validate() {
-    validateSources();
-    validateSinks();
-    validateInternalStreams();
-    validateReachability();
-  }
-
-  /**
-   * Validate the sources should have indegree being 0 and outdegree greater than 0
-   */
-  private void validateSources() {
-    sources.forEach(edge -> {
-        if (!edge.getSourceNodes().isEmpty()) {
-          throw new IllegalArgumentException(
-              String.format("Source stream %s should not have producers.", edge.getFormattedSystemStream()));
-        }
-        if (edge.getTargetNodes().isEmpty()) {
-          throw new IllegalArgumentException(
-              String.format("Source stream %s should have consumers.", edge.getFormattedSystemStream()));
-        }
-      });
-  }
-
-  /**
-   * Validate the sinks should have outdegree being 0 and indegree greater than 0
-   */
-  private void validateSinks() {
-    sinks.forEach(edge -> {
-        if (!edge.getTargetNodes().isEmpty()) {
-          throw new IllegalArgumentException(
-              String.format("Sink stream %s should not have consumers", edge.getFormattedSystemStream()));
-        }
-        if (edge.getSourceNodes().isEmpty()) {
-          throw new IllegalArgumentException(
-              String.format("Sink stream %s should have producers", edge.getFormattedSystemStream()));
-        }
-      });
-  }
-
-  /**
-   * Validate the internal streams should have both indegree and outdegree greater than 0
-   */
-  private void validateInternalStreams() {
-    Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
-    internalEdges.removeAll(sources);
-    internalEdges.removeAll(sinks);
-
-    internalEdges.forEach(edge -> {
-        if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
-          throw new IllegalArgumentException(
-              String.format("Internal stream %s should have both producers and consumers", edge.getFormattedSystemStream()));
-        }
-      });
-  }
-
-  /**
-   * Validate all nodes are reachable by sources.
-   */
-  private void validateReachability() {
-    // validate all nodes are reachable from the sources
-    final Set<ProcessorNode> reachable = findReachable();
-    if (reachable.size() != nodes.size()) {
-      Set<ProcessorNode> unreachable = new HashSet<>(nodes.values());
-      unreachable.removeAll(reachable);
-      throw new IllegalArgumentException(String.format("Processors %s cannot be reached from Sources.",
-          String.join(", ", unreachable.stream().map(ProcessorNode::getId).collect(Collectors.toList()))));
-    }
-  }
-
-  /**
-   * Find the reachable set of nodes using BFS.
-   * @return reachable set of {@link ProcessorNode}
-   */
-  /* package private */ Set<ProcessorNode> findReachable() {
-    Queue<ProcessorNode> queue = new ArrayDeque<>();
-    Set<ProcessorNode> visited = new HashSet<>();
-
-    sources.forEach(source -> {
-        List<ProcessorNode> next = source.getTargetNodes();
-        queue.addAll(next);
-        visited.addAll(next);
-      });
-
-    while (!queue.isEmpty()) {
-      ProcessorNode node = queue.poll();
-      node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
-          if (!visited.contains(target)) {
-            visited.add(target);
-            queue.offer(target);
-          }
-        });
-    }
-
-    return visited;
-  }
-
-  /**
-   * An variation of Kahn's algorithm of topological sorting.
-   * This algorithm also takes account of the simple loops in the graph
-   * @return topologically sorted {@link ProcessorNode}s
-   */
-  /* package private */ List<ProcessorNode> topologicalSort() {
-    Collection<ProcessorNode> pnodes = nodes.values();
-    if (pnodes.size() == 1) {
-      return new ArrayList<>(pnodes);
-    }
-
-    Queue<ProcessorNode> q = new ArrayDeque<>();
-    Map<String, Long> indegree = new HashMap<>();
-    Set<ProcessorNode> visited = new HashSet<>();
-    pnodes.forEach(node -> {
-        String nid = node.getId();
-        //only count the degrees of intermediate streams
-        long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
-        indegree.put(nid, degree);
-
-        if (degree == 0L) {
-          // start from the nodes that has no intermediate input streams, so it only consumes from sources
-          q.add(node);
-          visited.add(node);
-        }
-      });
-
-    List<ProcessorNode> sortedNodes = new ArrayList<>();
-    Set<ProcessorNode> reachable = new HashSet<>();
-    while (sortedNodes.size() < pnodes.size()) {
-      // Here we use indegree-based approach to implment Kahn's algorithm for topological sort
-      // This approach will not change the graph itself during computation.
-      //
-      // The algorithm works as:
-      // 1. start with nodes with no incoming edges (in degree being 0) and inserted into the list
-      // 2. remove the edge from any node in the list to its connected nodes by changing the indegree of the connected nodes.
-      // 3. add any new nodes with ingree being 0
-      // 4. loop 1-3 until no more nodes with indegree 0
-      //
-      while (!q.isEmpty()) {
-        ProcessorNode node = q.poll();
-        sortedNodes.add(node);
-        node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
-            String nid = n.getId();
-            Long degree = indegree.get(nid) - 1;
-            indegree.put(nid, degree);
-            if (degree == 0L && !visited.contains(n)) {
-              q.add(n);
-              visited.add(n);
-            }
-            reachable.add(n);
-          });
-      }
-
-      if (sortedNodes.size() < pnodes.size()) {
-        // The remaining nodes have cycles
-        // use the following approach to break the cycles
-        // start from the nodes that are reachable from previous traverse
-        reachable.removeAll(sortedNodes);
-        if (!reachable.isEmpty()) {
-          //find out the nodes with minimal input edge
-          long min = Long.MAX_VALUE;
-          ProcessorNode minNode = null;
-          for (ProcessorNode node : reachable) {
-            Long degree = indegree.get(node.getId());
-            if (degree < min) {
-              min = degree;
-              minNode = node;
-            }
-          }
-          // start from the node with minimal input edge again
-          q.add(minNode);
-          visited.add(minNode);
-        } else {
-          // all the remaining nodes should be reachable from sources
-          // start from sources again to find the next node that hasn't been visited
-          ProcessorNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
-              .filter(node -> !visited.contains(node))
-              .findAny().get();
-          q.add(nextNode);
-          visited.add(nextNode);
-        }
-      }
-    }
-
-    return sortedNodes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ProcessorNode.java b/samza-core/src/main/java/org/apache/samza/execution/ProcessorNode.java
deleted file mode 100644
index 3cb695f..0000000
--- a/samza-core/src/main/java/org/apache/samza/execution/ProcessorNode.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.execution;
-
-import com.google.common.base.Joiner;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * A ProcessorNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
- * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
- * A ProcessorNode contains the input/output, and the configs for physical execution.
- */
-public class ProcessorNode {
-  private static final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
-  private static final String CONFIG_PROCESSOR_PREFIX = "processors.%s.";
-
-  private final String id;
-  private final List<StreamEdge> inEdges = new ArrayList<>();
-  private final List<StreamEdge> outEdges = new ArrayList<>();
-  private final Config config;
-
-  ProcessorNode(String id, Config config) {
-    this.id = id;
-    this.config = config;
-  }
-
-  public  String getId() {
-    return id;
-  }
-
-  void addInEdge(StreamEdge in) {
-    inEdges.add(in);
-  }
-
-  void addOutEdge(StreamEdge out) {
-    outEdges.add(out);
-  }
-
-  List<StreamEdge> getInEdges() {
-    return inEdges;
-  }
-
-  List<StreamEdge> getOutEdges() {
-    return outEdges;
-  }
-
-  public Config generateConfig() {
-    Map<String, String> configs = new HashMap<>();
-    configs.put(JobConfig.JOB_NAME(), id);
-
-    List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
-    configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
-    log.info("Processor {} has generated configs {}", id, configs);
-
-    String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
-    // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
-    return Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix));
-  }
-
-  /**
-   * This function extract the subset of configs from the full config, and use it to override the generated configs
-   * from the processor.
-   * @param fullConfig full config
-   * @param generatedConfig config generated from the processor
-   * @param configPrefix prefix to extract the subset of the config overrides
-   * @return config that merges the generated configs and overrides
-   */
-  private static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
-    Config scopedConfig = fullConfig.subset(configPrefix);
-
-    Config[] configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig};
-    // Strip empty configs so they don't override the configs before them.
-    Map<String, String> mergedConfig = new HashMap<>();
-    for (Map<String, String> config : configPrecedence) {
-      for (Map.Entry<String, String> property : config.entrySet()) {
-        String value = property.getValue();
-        if (!(value == null || value.isEmpty())) {
-          mergedConfig.put(property.getKey(), property.getValue());
-        }
-      }
-    }
-    scopedConfig = new MapConfig(mergedConfig);
-    log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
-
-    return scopedConfig;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index 3215a22..9596d0f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -27,16 +27,16 @@ import org.apache.samza.util.Util;
 
 
 /**
- * A StreamEdge connects the source {@link ProcessorNode}s to the target {@link ProcessorNode}s with a stream.
- * If it's a sink StreamEdge, the target ProcessorNode is empty.
- * If it's a source StreamEdge, the source ProcessorNode is empty.
+ * A StreamEdge connects the source {@link JobNode}s to the target {@link JobNode}s with a stream.
+ * If it's a sink StreamEdge, the target JobNode is empty.
+ * If it's a source StreamEdge, the source JobNode is empty.
  */
 public class StreamEdge {
   public static final int PARTITIONS_UNKNOWN = -1;
 
   private final StreamSpec streamSpec;
-  private final List<ProcessorNode> sourceNodes = new ArrayList<>();
-  private final List<ProcessorNode> targetNodes = new ArrayList<>();
+  private final List<JobNode> sourceNodes = new ArrayList<>();
+  private final List<JobNode> targetNodes = new ArrayList<>();
 
   private String name = "";
   private int partitions = PARTITIONS_UNKNOWN;
@@ -46,11 +46,11 @@ public class StreamEdge {
     this.name = Util.getNameFromSystemStream(getSystemStream());
   }
 
-  void addSourceNode(ProcessorNode sourceNode) {
+  void addSourceNode(JobNode sourceNode) {
     sourceNodes.add(sourceNode);
   }
 
-  void addTargetNode(ProcessorNode targetNode) {
+  void addTargetNode(JobNode targetNode) {
     targetNodes.add(targetNode);
   }
 
@@ -70,11 +70,11 @@ public class StreamEdge {
     return Util.getNameFromSystemStream(getSystemStream());
   }
 
-  List<ProcessorNode> getSourceNodes() {
+  List<JobNode> getSourceNodes() {
     return sourceNodes;
   }
 
-  List<ProcessorNode> getTargetNodes() {
+  List<JobNode> getTargetNodes() {
     return targetNodes;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 1444662..5a125a2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -53,6 +53,12 @@ public interface OperatorSpec<OM> {
   MessageStreamImpl<OM> getNextStream();
 
   /**
+   * Return the ID for this operator
+   * @return ID integer
+   */
+  int getOpId();
+
+  /**
    * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP.
    *
    * @param config  the {@link Config} object for this task

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 9fe493e..cd6c492 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -26,8 +26,8 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.execution.ExecutionPlanner;
-import org.apache.samza.execution.ProcessorGraph;
-import org.apache.samza.execution.ProcessorNode;
+import org.apache.samza.execution.JobGraph;
+import org.apache.samza.execution.JobNode;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
@@ -60,19 +60,19 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   public void run(StreamApplication app) {
     try {
       // 1. initialize and plan
-      ProcessorGraph processorGraph = getExecutionPlan(app);
+      JobGraph jobGraph = getExecutionPlan(app);
 
       // 2. create the necessary streams
-      List<StreamSpec> streams = processorGraph.getIntermediateStreams().stream()
+      List<StreamSpec> streams = jobGraph.getIntermediateStreams().stream()
           .map(streamEdge -> streamEdge.getStreamSpec())
           .collect(Collectors.toList());
       streamManager.createStreams(streams);
 
       // 3. submit jobs for remote execution
-      processorGraph.getProcessorNodes().forEach(processor -> {
-          Config processorConfig = processor.generateConfig();
-          log.info("Starting processor {} with config {}", processor.getId(), processorConfig);
-          JobRunner runner = new JobRunner(processorConfig);
+      jobGraph.getJobNodes().forEach(job -> {
+          Config jobConfig = job.generateConfig();
+          log.info("Starting job {} with config {}", job.getId(), jobConfig);
+          JobRunner runner = new JobRunner(jobConfig);
           runner.run(true);
         });
     } catch (Throwable t) {
@@ -83,12 +83,12 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void kill(StreamApplication app) {
     try {
-      ProcessorGraph processorGraph = getExecutionPlan(app);
+      JobGraph jobGraph = getExecutionPlan(app);
 
-      processorGraph.getProcessorNodes().forEach(processor -> {
-          Config processorConfig = processor.generateConfig();
-          log.info("Killing processor {}", processor.getId());
-          JobRunner runner = new JobRunner(processorConfig);
+      jobGraph.getJobNodes().forEach(job -> {
+          Config jobConfig = job.generateConfig();
+          log.info("Killing job {}", job.getId());
+          JobRunner runner = new JobRunner(jobConfig);
           runner.kill();
         });
     } catch (Throwable t) {
@@ -102,12 +102,12 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
       boolean finished = false;
       boolean unsuccessfulFinish = false;
 
-      ProcessorGraph processorGraph = getExecutionPlan(app);
-      for (ProcessorNode processor : processorGraph.getProcessorNodes()) {
-        Config processorConfig = processor.generateConfig();
-        JobRunner runner = new JobRunner(processorConfig);
+      JobGraph jobGraph = getExecutionPlan(app);
+      for (JobNode job : jobGraph.getJobNodes()) {
+        Config jobConfig = job.generateConfig();
+        JobRunner runner = new JobRunner(jobConfig);
         ApplicationStatus status = runner.status();
-        log.debug("Status is {} for processor {}", new Object[]{status, processor.getId()});
+        log.debug("Status is {} for jopb {}", new Object[]{status, job.getId()});
 
         switch (status) {
           case Running:
@@ -133,7 +133,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
     }
   }
 
-  private ProcessorGraph getExecutionPlan(StreamApplication app) throws Exception {
+  private JobGraph getExecutionPlan(StreamApplication app) throws Exception {
     // build stream graph
     StreamGraph streamGraph = new StreamGraphImpl(this, config);
     app.init(streamGraph, config);

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index dc828d9..e524ba1 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -227,26 +227,26 @@ public class TestExecutionPlanner {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createStreamGraphWithJoin();
 
-    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
-    assertTrue(processorGraph.getSources().size() == 3);
-    assertTrue(processorGraph.getSinks().size() == 2);
-    assertTrue(processorGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy
+    JobGraph jobGraph = planner.createJobGraph(streamGraph);
+    assertTrue(jobGraph.getSources().size() == 3);
+    assertTrue(jobGraph.getSinks().size() == 2);
+    assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy
   }
 
   @Test
   public void testFetchExistingStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createStreamGraphWithJoin();
-    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+    JobGraph jobGraph = planner.createJobGraph(streamGraph);
 
-    ExecutionPlanner.updateExistingPartitions(processorGraph, streamManager);
-    assertTrue(processorGraph.getOrCreateEdge(input1).getPartitionCount() == 64);
-    assertTrue(processorGraph.getOrCreateEdge(input2).getPartitionCount() == 16);
-    assertTrue(processorGraph.getOrCreateEdge(input3).getPartitionCount() == 32);
-    assertTrue(processorGraph.getOrCreateEdge(output1).getPartitionCount() == 8);
-    assertTrue(processorGraph.getOrCreateEdge(output2).getPartitionCount() == 16);
+    ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
+    assertTrue(jobGraph.getOrCreateEdge(input1).getPartitionCount() == 64);
+    assertTrue(jobGraph.getOrCreateEdge(input2).getPartitionCount() == 16);
+    assertTrue(jobGraph.getOrCreateEdge(input3).getPartitionCount() == 32);
+    assertTrue(jobGraph.getOrCreateEdge(output1).getPartitionCount() == 8);
+    assertTrue(jobGraph.getOrCreateEdge(output2).getPartitionCount() == 16);
 
-    processorGraph.getIntermediateStreams().forEach(edge -> {
+    jobGraph.getIntermediateStreams().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == -1);
       });
   }
@@ -255,13 +255,13 @@ public class TestExecutionPlanner {
   public void testCalculateJoinInputPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createStreamGraphWithJoin();
-    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+    JobGraph jobGraph = planner.createJobGraph(streamGraph);
 
-    ExecutionPlanner.updateExistingPartitions(processorGraph, streamManager);
-    ExecutionPlanner.calculateJoinInputPartitions(streamGraph, processorGraph);
+    ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
+    ExecutionPlanner.calculateJoinInputPartitions(streamGraph, jobGraph);
 
     // the partitions should be the same as input1
-    processorGraph.getIntermediateStreams().forEach(edge -> {
+    jobGraph.getIntermediateStreams().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == 64);
       });
   }
@@ -274,11 +274,11 @@ public class TestExecutionPlanner {
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
     StreamGraph streamGraph = createSimpleGraph();
-    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
-    planner.calculatePartitions(streamGraph, processorGraph);
+    JobGraph jobGraph = planner.createJobGraph(streamGraph);
+    planner.calculatePartitions(streamGraph, jobGraph);
 
     // the partitions should be the same as input1
-    processorGraph.getIntermediateStreams().forEach(edge -> {
+    jobGraph.getIntermediateStreams().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == DEFAULT_PARTITIONS);
       });
   }
@@ -287,11 +287,11 @@ public class TestExecutionPlanner {
   public void testCalculateIntStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createSimpleGraph();
-    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
-    planner.calculatePartitions(streamGraph, processorGraph);
+    JobGraph jobGraph = planner.createJobGraph(streamGraph);
+    planner.calculatePartitions(streamGraph, jobGraph);
 
     // the partitions should be the same as input1
-    processorGraph.getIntermediateStreams().forEach(edge -> {
+    jobGraph.getIntermediateStreams().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == 64); // max of input1 and output1
       });
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
new file mode 100644
index 0000000..d829b64
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestJobGraph {
+
+  JobGraph graph1;
+  JobGraph graph2;
+  JobGraph graph3;
+  JobGraph graph4;
+  int streamSeq = 0;
+
+  private StreamSpec genStream() {
+    ++streamSeq;
+
+    return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
+  }
+
+  /**
+   * graph1 is the example graph from wikipedia
+   *
+   * 5   7   3
+   * | / | / |
+   * v   v   |
+   * 11  8   |
+   * | \X   /
+   * v v \v
+   * 2 9 10
+   */
+  private void createGraph1() {
+    graph1 = new JobGraph(null, null);
+
+    JobNode n2 = graph1.getOrCreateNode("2", "1");
+    JobNode n3 = graph1.getOrCreateNode("3", "1");
+    JobNode n5 = graph1.getOrCreateNode("5", "1");
+    JobNode n7 = graph1.getOrCreateNode("7", "1");
+    JobNode n8 = graph1.getOrCreateNode("8", "1");
+    JobNode n9 = graph1.getOrCreateNode("9", "1");
+    JobNode n10 = graph1.getOrCreateNode("10", "1");
+    JobNode n11 = graph1.getOrCreateNode("11", "1");
+
+    graph1.addSource(genStream(), n5);
+    graph1.addSource(genStream(), n7);
+    graph1.addSource(genStream(), n3);
+    graph1.addIntermediateStream(genStream(), n5, n11);
+    graph1.addIntermediateStream(genStream(), n7, n11);
+    graph1.addIntermediateStream(genStream(), n7, n8);
+    graph1.addIntermediateStream(genStream(), n3, n8);
+    graph1.addIntermediateStream(genStream(), n11, n2);
+    graph1.addIntermediateStream(genStream(), n11, n9);
+    graph1.addIntermediateStream(genStream(), n8, n9);
+    graph1.addIntermediateStream(genStream(), n11, n10);
+    graph1.addSink(genStream(), n2);
+    graph1.addSink(genStream(), n9);
+    graph1.addSink(genStream(), n10);
+  }
+
+  /**
+   * graph2 is a graph with a loop
+   * 1 -> 2 -> 3 -> 4 -> 5 -> 7
+   *      |<---6 <--|    <>
+   */
+  private void createGraph2() {
+    graph2 = new JobGraph(null, null);
+
+    JobNode n1 = graph2.getOrCreateNode("1", "1");
+    JobNode n2 = graph2.getOrCreateNode("2", "1");
+    JobNode n3 = graph2.getOrCreateNode("3", "1");
+    JobNode n4 = graph2.getOrCreateNode("4", "1");
+    JobNode n5 = graph2.getOrCreateNode("5", "1");
+    JobNode n6 = graph2.getOrCreateNode("6", "1");
+    JobNode n7 = graph2.getOrCreateNode("7", "1");
+
+    graph2.addSource(genStream(), n1);
+    graph2.addIntermediateStream(genStream(), n1, n2);
+    graph2.addIntermediateStream(genStream(), n2, n3);
+    graph2.addIntermediateStream(genStream(), n3, n4);
+    graph2.addIntermediateStream(genStream(), n4, n5);
+    graph2.addIntermediateStream(genStream(), n4, n6);
+    graph2.addIntermediateStream(genStream(), n6, n2);
+    graph2.addIntermediateStream(genStream(), n5, n5);
+    graph2.addIntermediateStream(genStream(), n5, n7);
+    graph2.addSink(genStream(), n7);
+  }
+
+  /**
+   * graph3 is a graph with two self loops
+   * 1<->1 -> 2<->2
+   */
+  private void createGraph3() {
+    graph3 = new JobGraph(null, null);
+
+    JobNode n1 = graph3.getOrCreateNode("1", "1");
+    JobNode n2 = graph3.getOrCreateNode("2", "1");
+
+    graph3.addSource(genStream(), n1);
+    graph3.addIntermediateStream(genStream(), n1, n1);
+    graph3.addIntermediateStream(genStream(), n1, n2);
+    graph3.addIntermediateStream(genStream(), n2, n2);
+  }
+
+  /**
+   * graph4 is a graph of single-loop node
+   * 1<->1
+   */
+  private void createGraph4() {
+    graph4 = new JobGraph(null, null);
+
+    JobNode n1 = graph4.getOrCreateNode("1", "1");
+
+    graph4.addSource(genStream(), n1);
+    graph4.addIntermediateStream(genStream(), n1, n1);
+  }
+
+  @Before
+  public void setup() {
+    createGraph1();
+    createGraph2();
+    createGraph3();
+    createGraph4();
+  }
+
+  @Test
+  public void testAddSource() {
+    JobGraph graph = new JobGraph(null, null);
+
+    /**
+     * s1 -> 1
+     * s2 ->|
+     *
+     * s3 -> 2
+     *   |-> 3
+     */
+    JobNode n1 = graph.getOrCreateNode("1", "1");
+    JobNode n2 = graph.getOrCreateNode("2", "1");
+    JobNode n3 = graph.getOrCreateNode("3", "1");
+    StreamSpec s1 = genStream();
+    StreamSpec s2 = genStream();
+    StreamSpec s3 = genStream();
+    graph.addSource(s1, n1);
+    graph.addSource(s2, n1);
+    graph.addSource(s3, n2);
+    graph.addSource(s3, n3);
+
+    assertTrue(graph.getSources().size() == 3);
+
+    assertTrue(graph.getOrCreateNode("1", "1").getInEdges().size() == 2);
+    assertTrue(graph.getOrCreateNode("2", "1").getInEdges().size() == 1);
+    assertTrue(graph.getOrCreateNode("3", "1").getInEdges().size() == 1);
+
+    assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 0);
+    assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 1);
+    assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 0);
+    assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 1);
+    assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 0);
+    assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 2);
+  }
+
+  @Test
+  public void testAddSink() {
+    /**
+     * 1 -> s1
+     * 2 -> s2
+     * 2 -> s3
+     */
+    JobGraph graph = new JobGraph(null, null);
+    JobNode n1 = graph.getOrCreateNode("1", "1");
+    JobNode n2 = graph.getOrCreateNode("2", "1");
+    StreamSpec s1 = genStream();
+    StreamSpec s2 = genStream();
+    StreamSpec s3 = genStream();
+    graph.addSink(s1, n1);
+    graph.addSink(s2, n2);
+    graph.addSink(s3, n2);
+
+    assertTrue(graph.getSinks().size() == 3);
+    assertTrue(graph.getOrCreateNode("1", "1").getOutEdges().size() == 1);
+    assertTrue(graph.getOrCreateNode("2", "1").getOutEdges().size() == 2);
+
+    assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 1);
+    assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 0);
+    assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 1);
+    assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 0);
+    assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 1);
+    assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 0);
+  }
+
+  @Test
+  public void testReachable() {
+    Set<JobNode> reachable1 = graph1.findReachable();
+    assertTrue(reachable1.size() == 8);
+
+    Set<JobNode> reachable2 = graph2.findReachable();
+    assertTrue(reachable2.size() == 7);
+  }
+
+  @Test
+  public void testTopologicalSort() {
+
+    // test graph1
+    List<JobNode> sortedNodes1 = graph1.topologicalSort();
+    Map<String, Integer> idxMap1 = new HashMap<>();
+    for (int i = 0; i < sortedNodes1.size(); i++) {
+      idxMap1.put(sortedNodes1.get(i).getJobName(), i);
+    }
+
+    assertTrue(idxMap1.size() == 8);
+    assertTrue(idxMap1.get("11") > idxMap1.get("5"));
+    assertTrue(idxMap1.get("11") > idxMap1.get("7"));
+    assertTrue(idxMap1.get("8") > idxMap1.get("7"));
+    assertTrue(idxMap1.get("8") > idxMap1.get("3"));
+    assertTrue(idxMap1.get("2") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("9") > idxMap1.get("8"));
+    assertTrue(idxMap1.get("9") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("10") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("10") > idxMap1.get("3"));
+
+    // test graph2
+    List<JobNode> sortedNodes2 = graph2.topologicalSort();
+    Map<String, Integer> idxMap2 = new HashMap<>();
+    for (int i = 0; i < sortedNodes2.size(); i++) {
+      idxMap2.put(sortedNodes2.get(i).getJobName(), i);
+    }
+
+    assertTrue(idxMap2.size() == 7);
+    assertTrue(idxMap2.get("2") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("3") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("4") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("6") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("5") > idxMap2.get("4"));
+    assertTrue(idxMap2.get("7") > idxMap2.get("5"));
+
+    //test graph3
+    List<JobNode> sortedNodes3 = graph3.topologicalSort();
+    assertTrue(sortedNodes3.size() == 2);
+    assertEquals(sortedNodes3.get(0).getJobName(), "1");
+    assertEquals(sortedNodes3.get(1).getJobName(), "2");
+
+    //test graph4
+    List<JobNode> sortedNodes4 = graph4.topologicalSort();
+    assertTrue(sortedNodes4.size() == 1);
+    assertEquals(sortedNodes4.get(0).getJobName(), "1");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b70ee983/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
deleted file mode 100644
index 2f89d91..0000000
--- a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.execution;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.system.StreamSpec;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-public class TestProcessorGraph {
-
-  ProcessorGraph graph1;
-  ProcessorGraph graph2;
-  ProcessorGraph graph3;
-  ProcessorGraph graph4;
-  int streamSeq = 0;
-
-  private StreamSpec genStream() {
-    ++streamSeq;
-
-    return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
-  }
-
-  @Before
-  public void setup() {
-    /**
-     * graph1 is the example graph from wikipedia
-     *
-     * 5   7   3
-     * | / | / |
-     * v   v   |
-     * 11  8   |
-     * | \X   /
-     * v v \v
-     * 2 9 10
-     */
-    // init graph1
-    graph1 = new ProcessorGraph(null);
-    graph1.addSource(genStream(), "5");
-    graph1.addSource(genStream(), "7");
-    graph1.addSource(genStream(), "3");
-    graph1.addIntermediateStream(genStream(), "5", "11");
-    graph1.addIntermediateStream(genStream(), "7", "11");
-    graph1.addIntermediateStream(genStream(), "7", "8");
-    graph1.addIntermediateStream(genStream(), "3", "8");
-    graph1.addIntermediateStream(genStream(), "11", "2");
-    graph1.addIntermediateStream(genStream(), "11", "9");
-    graph1.addIntermediateStream(genStream(), "8", "9");
-    graph1.addIntermediateStream(genStream(), "11", "10");
-    graph1.addSink(genStream(), "2");
-    graph1.addSink(genStream(), "9");
-    graph1.addSink(genStream(), "10");
-
-    /**
-     * graph2 is a graph with a loop
-     * 1 -> 2 -> 3 -> 4 -> 5 -> 7
-     *      |<---6 <--|    <>
-     */
-    graph2 = new ProcessorGraph(null);
-    graph2.addSource(genStream(), "1");
-    graph2.addIntermediateStream(genStream(), "1", "2");
-    graph2.addIntermediateStream(genStream(), "2", "3");
-    graph2.addIntermediateStream(genStream(), "3", "4");
-    graph2.addIntermediateStream(genStream(), "4", "5");
-    graph2.addIntermediateStream(genStream(), "4", "6");
-    graph2.addIntermediateStream(genStream(), "6", "2");
-    graph2.addIntermediateStream(genStream(), "5", "5");
-    graph2.addIntermediateStream(genStream(), "5", "7");
-    graph2.addSink(genStream(), "7");
-
-    /**
-     * graph3 is a graph with self loops
-     * 1<->1 -> 2<->2
-     */
-    graph3 = new ProcessorGraph(null);
-    graph3.addSource(genStream(), "1");
-    graph3.addIntermediateStream(genStream(), "1", "1");
-    graph3.addIntermediateStream(genStream(), "1", "2");
-    graph3.addIntermediateStream(genStream(), "2", "2");
-
-    /**
-     * graph4 is a graph of single-loop node
-     * 1<->1
-     */
-    graph4 = new ProcessorGraph(null);
-    graph4.addSource(genStream(), "1");
-    graph4.addIntermediateStream(genStream(), "1", "1");
-  }
-
-  @Test
-  public void testAddSource() {
-    ProcessorGraph graph = new ProcessorGraph(null);
-
-    /**
-     * s1 -> 1
-     * s2 ->|
-     *
-     * s3 -> 2
-     *   |-> 3
-     */
-    StreamSpec s1 = genStream();
-    StreamSpec s2 = genStream();
-    StreamSpec s3 = genStream();
-    graph.addSource(s1, "1");
-    graph.addSource(s2, "1");
-    graph.addSource(s3, "2");
-    graph.addSource(s3, "3");
-
-    assertTrue(graph.getSources().size() == 3);
-
-    assertTrue(graph.getOrCreateProcessor("1").getInEdges().size() == 2);
-    assertTrue(graph.getOrCreateProcessor("2").getInEdges().size() == 1);
-    assertTrue(graph.getOrCreateProcessor("3").getInEdges().size() == 1);
-
-    assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 2);
-  }
-
-  @Test
-  public void testAddSink() {
-    /**
-     * 1 -> s1
-     * 2 -> s2
-     * 2 -> s3
-     */
-    StreamSpec s1 = genStream();
-    StreamSpec s2 = genStream();
-    StreamSpec s3 = genStream();
-    ProcessorGraph graph = new ProcessorGraph(null);
-    graph.addSink(s1, "1");
-    graph.addSink(s2, "2");
-    graph.addSink(s3, "2");
-
-    assertTrue(graph.getSinks().size() == 3);
-    assertTrue(graph.getOrCreateProcessor("1").getOutEdges().size() == 1);
-    assertTrue(graph.getOrCreateProcessor("2").getOutEdges().size() == 2);
-
-    assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 0);
-  }
-
-  @Test
-  public void testReachable() {
-    Set<ProcessorNode> reachable1 = graph1.findReachable();
-    assertTrue(reachable1.size() == 8);
-
-    Set<ProcessorNode> reachable2 = graph2.findReachable();
-    assertTrue(reachable2.size() == 7);
-  }
-
-  @Test
-  public void testTopologicalSort() {
-
-    // test graph1
-    List<ProcessorNode> sortedNodes1 = graph1.topologicalSort();
-    Map<String, Integer> idxMap1 = new HashMap<>();
-    for (int i = 0; i < sortedNodes1.size(); i++) {
-      idxMap1.put(sortedNodes1.get(i).getId(), i);
-    }
-
-    assertTrue(idxMap1.size() == 8);
-    assertTrue(idxMap1.get("11") > idxMap1.get("5"));
-    assertTrue(idxMap1.get("11") > idxMap1.get("7"));
-    assertTrue(idxMap1.get("8") > idxMap1.get("7"));
-    assertTrue(idxMap1.get("8") > idxMap1.get("3"));
-    assertTrue(idxMap1.get("2") > idxMap1.get("11"));
-    assertTrue(idxMap1.get("9") > idxMap1.get("8"));
-    assertTrue(idxMap1.get("9") > idxMap1.get("11"));
-    assertTrue(idxMap1.get("10") > idxMap1.get("11"));
-    assertTrue(idxMap1.get("10") > idxMap1.get("3"));
-
-    // test graph2
-    List<ProcessorNode> sortedNodes2 = graph2.topologicalSort();
-    Map<String, Integer> idxMap2 = new HashMap<>();
-    for (int i = 0; i < sortedNodes2.size(); i++) {
-      idxMap2.put(sortedNodes2.get(i).getId(), i);
-    }
-
-    assertTrue(idxMap2.size() == 7);
-    assertTrue(idxMap2.get("2") > idxMap2.get("1"));
-    assertTrue(idxMap2.get("3") > idxMap2.get("1"));
-    assertTrue(idxMap2.get("4") > idxMap2.get("1"));
-    assertTrue(idxMap2.get("6") > idxMap2.get("1"));
-    assertTrue(idxMap2.get("5") > idxMap2.get("4"));
-    assertTrue(idxMap2.get("7") > idxMap2.get("5"));
-
-    //test graph3
-    List<ProcessorNode> sortedNodes3 = graph3.topologicalSort();
-    assertTrue(sortedNodes3.size() == 2);
-    assertEquals(sortedNodes3.get(0).getId(), "1");
-    assertEquals(sortedNodes3.get(1).getId(), "2");
-
-    //test graph4
-    List<ProcessorNode> sortedNodes4 = graph4.topologicalSort();
-    assertTrue(sortedNodes4.size() == 1);
-    assertEquals(sortedNodes4.get(0).getId(), "1");
-  }
-}