You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/05/02 17:09:36 UTC

[4/4] samza git commit: SAMZA-1204: Visualize StreamGraph and ExecutionPlan

SAMZA-1204: Visualize StreamGraph and ExecutionPlan

Once a Samza application (using fluent API) is deployed, an execution plan will be generated by the ExecutionPlanner. The plan JSON will be written to a file (plan.json) under the ./plan directory, which also contains the plan.html and javscripts (js folder).

Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Author: xinyuiscool <xi...@gmail.com>

Reviewers: Jake Maes <jm...@apache.org>

Closes #127 from xinyuiscool/SAMZA-1204


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b71b253d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b71b253d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b71b253d

Branch: refs/heads/master
Commit: b71b253d2ad6bf1dc68c7bc6bb2e30782d86c7ff
Parents: ad1f161
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Tue May 2 10:09:22 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue May 2 10:09:22 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   3 +
 .../apache/samza/execution/ExecutionPlan.java   |  11 +-
 .../samza/execution/ExecutionPlanner.java       |  10 +-
 .../org/apache/samza/execution/JobGraph.java    |  45 +++--
 .../samza/execution/JobGraphJsonGenerator.java  | 199 ++++++++++---------
 .../org/apache/samza/execution/JobNode.java     |  10 +-
 .../samza/operators/MessageStreamImpl.java      |  30 +--
 .../samza/operators/impl/RootOperatorImpl.java  |   5 +
 .../samza/operators/spec/OperatorSpec.java      |   6 +
 .../operators/spec/PartialJoinOperatorSpec.java |   8 +
 .../samza/operators/spec/SinkOperatorSpec.java  |   8 +
 .../operators/spec/StreamOperatorSpec.java      |   8 +
 .../operators/spec/WindowOperatorSpec.java      |   8 +
 .../samza/operators/util/OperatorJsonUtils.java |  89 +++++++++
 .../runtime/AbstractApplicationRunner.java      |  27 +++
 .../samza/runtime/LocalApplicationRunner.java   |   1 +
 .../samza/runtime/RemoteApplicationRunner.java  |   1 +
 .../samza/config/ShellCommandConfig.scala       |   5 +
 .../samza/execution/TestExecutionPlanner.java   |  15 +-
 .../apache/samza/execution/TestJobGraph.java    |  82 ++++----
 .../execution/TestJobGraphJsonGenerator.java    |   4 +-
 .../samza/operators/impl/TestOperatorImpl.java  |   5 +
 .../samza/operators/spec/TestOperatorSpecs.java |   6 +-
 samza-shell/src/main/assembly/src.xml           |   8 +
 samza-shell/src/main/bash/run-app.sh            |   9 +
 samza-shell/src/main/visualizer/js/d3.v3.min.js |   5 +
 .../src/main/visualizer/js/dagre-d3.min.js      |  28 +++
 .../src/main/visualizer/js/planToDagre.js       |  91 +++++++++
 samza-shell/src/main/visualizer/plan.html       | 118 +++++++++++
 29 files changed, 651 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index dc56077..74e5161 100644
--- a/build.gradle
+++ b/build.gradle
@@ -71,6 +71,8 @@ rat {
     '**/non-responsive.less',
     '**/ropa-sans.css',
     '**/syntax.css',
+    '**/d3.v3.min.js',
+    '**/dagre-d3.min.js',
     '.idea/**',
     '.reviewboardrc',
     'docs/_site/**',
@@ -396,6 +398,7 @@ project(":samza-shell") {
     classifier = 'dist'
     from 'src/main/bash'
     from 'src/main/resources'
+    from 'src/main/visualizer'
   }
 
   artifacts {

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
index 6e2b4c6..bde9bfb 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
@@ -20,6 +20,7 @@
 package org.apache.samza.execution;
 
 import java.util.List;
+import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.system.StreamSpec;
 
@@ -28,10 +29,11 @@ import org.apache.samza.system.StreamSpec;
  * This interface represents Samza {@link org.apache.samza.application.StreamApplication}
  * plans for physical execution.
  */
+@InterfaceStability.Unstable
 public interface ExecutionPlan {
 
   /**
-   * Returns the configs for single stage job, in the order of topologically sort.
+   * Returns the configs for single stage job, in topological sort order.
    * @return list of job configs
    */
   List<JobConfig> getJobConfigs();
@@ -43,9 +45,10 @@ public interface ExecutionPlan {
   List<StreamSpec> getIntermediateStreams();
 
   /**
-   * Returns the JSON representation of the plan for visualization
-   * @return json string
-   * @throws Exception exception
+   * Returns the JSON representation of the plan.
+   * @return JSON string
+   * @throws Exception exception during JSON serialization, including {@link java.io.IOException}
+   *                   and {@link org.codehaus.jackson.JsonGenerationException}
    */
   String getPlanAsJson() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index ac39eb8..d763d84 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -61,6 +61,9 @@ public class ExecutionPlanner {
     // create physical job graph based on stream graph
     JobGraph jobGraph = createJobGraph(streamGraph);
 
+    // fetch the external streams partition info
+    updateExistingPartitions(jobGraph, streamManager);
+
     if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
       // figure out the partitions for internal streams
       calculatePartitions(streamGraph, jobGraph);
@@ -84,7 +87,7 @@ public class ExecutionPlanner {
     // For this phase, we have a single job node for the whole dag
     String jobName = config.get(JobConfig.JOB_NAME());
     String jobId = config.get(JobConfig.JOB_ID(), "1");
-    JobNode node = jobGraph.getOrCreateNode(jobName, jobId, streamGraph);
+    JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId, streamGraph);
 
     // add sources
     sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
@@ -104,9 +107,6 @@ public class ExecutionPlanner {
    * Figure out the number of partitions of all streams
    */
   /* package private */ void calculatePartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
-    // fetch the external streams partition info
-    updateExistingPartitions(jobGraph, streamManager);
-
     // calculate the partitions for the input streams of join operators
     calculateJoinInputPartitions(streamGraph, jobGraph);
 
@@ -167,7 +167,7 @@ public class ExecutionPlanner {
     Set<OperatorSpec> visited = new HashSet<>();
 
     streamGraph.getInputStreams().entrySet().forEach(entry -> {
-        StreamEdge streamEdge = jobGraph.getOrCreateEdge(entry.getKey());
+        StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey());
         // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
         findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
             outputStreamToJoinSpec, joinQ, visited);

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index ff5fbdf..35f27ab 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.StreamGraphImpl;
@@ -65,40 +66,46 @@ import org.slf4j.LoggerFactory;
     this.config = config;
   }
 
-  /**
-   * Returns the configs for single stage job, in the order of topologically sort.
-   * @return list of job configs
-   */
+  @Override
   public List<JobConfig> getJobConfigs() {
-    return getJobNodes().stream().map(JobNode::generateConfig).collect(Collectors.toList());
+    String json = "";
+    try {
+      json = getPlanAsJson();
+    } catch (Exception e) {
+      log.warn("Failed to generate plan JSON", e);
+    }
+
+    final String planJson = json;
+    return getJobNodes().stream().map(n -> n.generateConfig(planJson)).collect(Collectors.toList());
   }
 
-  /**
-   * Returns the intermediate streams that need to be created.
-   * @return intermediate {@link StreamSpec}s
-   */
+  @Override
   public List<StreamSpec> getIntermediateStreams() {
     return getIntermediateStreamEdges().stream()
         .map(streamEdge -> streamEdge.getStreamSpec())
         .collect(Collectors.toList());
   }
 
-  /**
-   * Returns the JSON representation of the plan for visualization
-   * @return json string
-   * @throws Exception
-   */
+  @Override
   public String getPlanAsJson() throws Exception {
     return jsonGenerator.toJson(this);
   }
 
   /**
+   * Returns the config for this application
+   * @return {@link ApplicationConfig}
+   */
+  public ApplicationConfig getApplicationConfig() {
+    return new ApplicationConfig(config);
+  }
+
+  /**
    * Add a source stream to a {@link JobNode}
    * @param input source stream
    * @param node the job node that consumes from the source
    */
   void addSource(StreamSpec input, JobNode node) {
-    StreamEdge edge = getOrCreateEdge(input);
+    StreamEdge edge = getOrCreateStreamEdge(input);
     edge.addTargetNode(node);
     node.addInEdge(edge);
     sources.add(edge);
@@ -110,7 +117,7 @@ import org.slf4j.LoggerFactory;
    * @param node the job node that outputs to the sink
    */
   void addSink(StreamSpec output, JobNode node) {
-    StreamEdge edge = getOrCreateEdge(output);
+    StreamEdge edge = getOrCreateStreamEdge(output);
     edge.addSourceNode(node);
     node.addOutEdge(edge);
     sinks.add(edge);
@@ -123,7 +130,7 @@ import org.slf4j.LoggerFactory;
    * @param to the target node
    */
   void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
-    StreamEdge edge = getOrCreateEdge(streamSpec);
+    StreamEdge edge = getOrCreateStreamEdge(streamSpec);
     edge.addSourceNode(from);
     edge.addTargetNode(to);
     from.addOutEdge(edge);
@@ -137,7 +144,7 @@ import org.slf4j.LoggerFactory;
    * @param jobId id of the job
    * @return
    */
-  JobNode getOrCreateNode(String jobName, String jobId, StreamGraphImpl streamGraph) {
+  JobNode getOrCreateJobNode(String jobName, String jobId, StreamGraphImpl streamGraph) {
     String nodeId = JobNode.createId(jobName, jobId);
     JobNode node = nodes.get(nodeId);
     if (node == null) {
@@ -152,7 +159,7 @@ import org.slf4j.LoggerFactory;
    * @param streamSpec spec of the StreamEdge
    * @return stream edge
    */
-  StreamEdge getOrCreateEdge(StreamSpec streamSpec) {
+  StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec) {
     String streamId = streamSpec.getId();
     StreamEdge edge = edges.get(streamId);
     if (edge == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 317616c..96c0538 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -19,95 +19,99 @@
 
 package org.apache.samza.execution;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.util.OperatorJsonUtils;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 
+
 /**
  * This class generates the JSON representation of the {@link JobGraph}.
  */
-public class JobGraphJsonGenerator {
-
-  /**
-   * This class provides the necessary connection of operators for traversal.
-   */
-  static abstract class Traversable {
-    @JsonProperty("NextOperatorIds")
-    Set<Integer>  nextOperatorIds = new HashSet<>();
-  }
-
-  static final class OperatorJson extends Traversable {
-    @JsonProperty("OpCode")
-    String opCode;
-    @JsonProperty("OpId")
-    int opId;
-    @JsonProperty("OutputStreamId")
-    String outputStreamId;
-    @JsonProperty("PairedOpId")
-    int pairedOpId = -1;  //for join operator, we will have a pair nodes for two partial joins
-  }
+/* package private */ class JobGraphJsonGenerator {
 
   static final class StreamSpecJson {
-    @JsonProperty("Id")
+    @JsonProperty("id")
     String id;
-    @JsonProperty("SystemName")
+    @JsonProperty("systemName")
     String systemName;
-    @JsonProperty("PhysicalName")
+    @JsonProperty("physicalName")
     String physicalName;
-    @JsonProperty("PartitionCount")
+    @JsonProperty("partitionCount")
     int partitionCount;
   }
 
   static final class StreamEdgeJson {
-    @JsonProperty("StreamSpec")
+    @JsonProperty("streamSpec")
     StreamSpecJson streamSpec;
+    @JsonProperty("sourceJobs")
+    List<String> sourceJobs;
+    @JsonProperty("targetJobs")
+    List<String> targetJobs;
   }
 
   static final class OperatorGraphJson {
-    @JsonProperty("InputStreams")
-    List<InputStreamJson> inputStreams;
-    @JsonProperty("Operators")
-    Map<Integer, OperatorJson> operators = new HashMap<>();
+    @JsonProperty("inputStreams")
+    List<StreamJson> inputStreams;
+    @JsonProperty("outputStreams")
+    List<StreamJson> outputStreams;
+    @JsonProperty("operators")
+    Map<Integer, Map<String, Object>> operators = new HashMap<>();
+    @JsonProperty("canonicalOpIds")
+    Map<Integer, String> canonicalOpIds = new HashMap<>();
   }
 
-  static final class InputStreamJson extends Traversable {
-    @JsonProperty("StreamId")
+  static final class StreamJson {
+    @JsonProperty("streamId")
     String streamId;
+    @JsonProperty("nextOperatorIds")
+    Set<Integer>  nextOperatorIds = new HashSet<>();
   }
 
   static final class JobNodeJson {
-    @JsonProperty("JobName")
+    @JsonProperty("jobName")
     String jobName;
-    @JsonProperty("JobId")
+    @JsonProperty("jobId")
     String jobId;
-    @JsonProperty("OperatorGraph")
+    @JsonProperty("operatorGraph")
     OperatorGraphJson operatorGraph;
   }
 
   static final class JobGraphJson {
-    @JsonProperty("Jobs")
+    @JsonProperty("jobs")
     List<JobNodeJson> jobs;
-    @JsonProperty("Streams")
-    Map<String, StreamEdgeJson> streams;
+    @JsonProperty("sourceStreams")
+    Map<String, StreamEdgeJson> sourceStreams;
+    @JsonProperty("sinkStreams")
+    Map<String, StreamEdgeJson> sinkStreams;
+    @JsonProperty("intermediateStreams")
+    Map<String, StreamEdgeJson> intermediateStreams;
+    @JsonProperty("applicationName")
+    String applicationName;
+    @JsonProperty("applicationId")
+    String applicationId;
   }
 
-  // Mapping from the output stream to the join spec. Since StreamGraph creates two partial join operators for a join and they
-  // will have the same output stream, this mapping is used to choose one of them as the unique join spec representing this join
-  // (who register first in the map wins).
-  Map<MessageStream, OperatorSpec> outputStreamToJoinSpec = new HashMap<>();
+  // Mapping from the output stream to the ids.
+  // Logically they belong to the same operator, but in code we generate one operator for each input.
+  // This is to associate the operators that output to the same MessageStream.
+  Multimap<MessageStream, Integer> outputStreamToOpIds = HashMultimap.create();
 
   /**
    * Returns the JSON representation of a {@link JobGraph}
@@ -119,13 +123,18 @@ public class JobGraphJsonGenerator {
     JobGraphJson jobGraphJson = new JobGraphJson();
 
     // build StreamEdge JSON
-    jobGraphJson.streams = new HashMap<>();
-    jobGraph.getSources().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams));
-    jobGraph.getSinks().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams));
-    jobGraph.getIntermediateStreamEdges().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams));
+    ApplicationConfig appConfig = jobGraph.getApplicationConfig();
+    jobGraphJson.applicationName = appConfig.getAppName();
+    jobGraphJson.applicationId = appConfig.getAppId();
+    jobGraphJson.sourceStreams = new HashMap<>();
+    jobGraphJson.sinkStreams = new HashMap<>();
+    jobGraphJson.intermediateStreams = new HashMap<>();
+    jobGraph.getSources().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams));
+    jobGraph.getSinks().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams));
+    jobGraph.getIntermediateStreamEdges().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.intermediateStreams));
 
     jobGraphJson.jobs = jobGraph.getJobNodes().stream()
-        .map(jobNode -> buildJobNodeJson(jobNode, jobGraphJson.streams))
+        .map(jobNode -> buildJobNodeJson(jobNode))
         .collect(Collectors.toList());
 
     ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -137,10 +146,9 @@ public class JobGraphJsonGenerator {
   /**
    * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.operators.StreamGraph} for this job
    * @param jobNode job node in the {@link JobGraph}
-   * @param streamEdges map of {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
    * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson}
    */
-  private JobNodeJson buildJobNodeJson(JobNode jobNode, Map<String, StreamEdgeJson> streamEdges) {
+  private JobNodeJson buildJobNodeJson(JobNode jobNode) {
     JobNodeJson job = new JobNodeJson();
     job.jobName = jobNode.getJobName();
     job.jobId = jobNode.getJobId();
@@ -157,10 +165,27 @@ public class JobGraphJsonGenerator {
     OperatorGraphJson opGraph = new OperatorGraphJson();
     opGraph.inputStreams = new ArrayList<>();
     jobNode.getStreamGraph().getInputStreams().forEach((streamSpec, stream) -> {
-        InputStreamJson inputJson = new InputStreamJson();
-        inputJson.streamId = streamSpec.getId();
+        StreamJson inputJson = new StreamJson();
         opGraph.inputStreams.add(inputJson);
-        updateOperatorGraphJson((MessageStreamImpl) stream, inputJson, opGraph);
+        inputJson.streamId = streamSpec.getId();
+        Collection<OperatorSpec> specs = ((MessageStreamImpl) stream).getRegisteredOperatorSpecs();
+        inputJson.nextOperatorIds = specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet());
+
+        updateOperatorGraphJson((MessageStreamImpl) stream, opGraph);
+
+        for (Map.Entry<MessageStream, Collection<Integer>> entry : outputStreamToOpIds.asMap().entrySet()) {
+          List<Integer> sortedIds = new ArrayList<>(entry.getValue());
+          Collections.sort(sortedIds);
+          String canonicalId = Joiner.on(',').join(sortedIds);
+          sortedIds.stream().forEach(id -> opGraph.canonicalOpIds.put(id, canonicalId));
+        }
+      });
+
+    opGraph.outputStreams = new ArrayList<>();
+    jobNode.getStreamGraph().getOutputStreams().keySet().forEach(streamSpec -> {
+        StreamJson outputJson = new StreamJson();
+        outputJson.streamId = streamSpec.getId();
+        opGraph.outputStreams.add(outputJson);
       });
     return opGraph;
   }
@@ -168,65 +193,30 @@ public class JobGraphJsonGenerator {
   /**
    * Traverse the {@StreamGraph} recursively and update the operator graph JSON POJO.
    * @param messageStream input
-   * @param parent parent node in the traveral
    * @param opGraph operator graph to build
    */
-  private void updateOperatorGraphJson(MessageStreamImpl messageStream, Traversable parent, OperatorGraphJson opGraph) {
+  private void updateOperatorGraphJson(MessageStreamImpl messageStream, OperatorGraphJson opGraph) {
     Collection<OperatorSpec> specs = messageStream.getRegisteredOperatorSpecs();
     specs.forEach(opSpec -> {
-        parent.nextOperatorIds.add(opSpec.getOpId());
+        opGraph.operators.put(opSpec.getOpId(), OperatorJsonUtils.operatorToMap(opSpec));
 
-        OperatorJson opJson = getOrCreateOperatorJson(opSpec, opGraph);
-        if (opSpec instanceof SinkOperatorSpec) {
-          opJson.outputStreamId = ((SinkOperatorSpec) opSpec).getOutputStream().getStreamSpec().getId();
-        } else if (opSpec.getNextStream() != null) {
-          updateOperatorGraphJson(opSpec.getNextStream(), opJson, opGraph);
+        if (opSpec.getOpCode() == OperatorSpec.OpCode.JOIN || opSpec.getOpCode() == OperatorSpec.OpCode.MERGE) {
+          outputStreamToOpIds.put(opSpec.getNextStream(), opSpec.getOpId());
         }
-      });
-  }
-
-  /**
-   * Get or create the JSON POJO for an operator.
-   * @param opSpec {@link OperatorSpec}
-   * @param opGraph {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
-   * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorJson}
-   */
-  private OperatorJson getOrCreateOperatorJson(OperatorSpec opSpec, OperatorGraphJson opGraph) {
-    Map<Integer, OperatorJson> operators = opGraph.operators;
-    OperatorJson opJson = operators.get(opSpec.getOpId());
-    if (opJson == null) {
-      opJson = new OperatorJson();
-      opJson.opCode = opSpec.getOpCode().name();
-      opJson.opId = opSpec.getOpId();
-      operators.put(opSpec.getOpId(), opJson);
-    }
 
-    if (opSpec instanceof PartialJoinOperatorSpec) {
-      // every join will have two partial join operators
-      // we will choose one of them in order to consolidate the inputs
-      // the first one who registered with the outputStreamToJoinSpec will win
-      MessageStream output = opSpec.getNextStream();
-      OperatorSpec joinSpec = outputStreamToJoinSpec.get(output);
-      if (joinSpec == null) {
-        joinSpec = opSpec;
-        outputStreamToJoinSpec.put(output, joinSpec);
-      } else if (joinSpec != opSpec) {
-        OperatorJson joinNode = operators.get(joinSpec.getOpId());
-        joinNode.pairedOpId = opJson.opId;
-        opJson.pairedOpId = joinNode.opId;
-      }
-    }
-
-    return opJson;
+        if (opSpec.getNextStream() != null) {
+          updateOperatorGraphJson(opSpec.getNextStream(), opGraph);
+        }
+      });
   }
 
   /**
    * Get or create the JSON POJO for a {@link StreamEdge}
    * @param edge {@link StreamEdge}
    * @param streamEdges map of streamId to {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
-   * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
+   * @return JSON representation of the {@link StreamEdge}
    */
-  private StreamEdgeJson getOrCreateStreamEdgeJson(StreamEdge edge, Map<String, StreamEdgeJson> streamEdges) {
+  private StreamEdgeJson buildStreamEdgeJson(StreamEdge edge, Map<String, StreamEdgeJson> streamEdges) {
     String streamId = edge.getStreamSpec().getId();
     StreamEdgeJson edgeJson = streamEdges.get(streamId);
     if (edgeJson == null) {
@@ -237,6 +227,19 @@ public class JobGraphJsonGenerator {
       streamSpecJson.physicalName = edge.getStreamSpec().getPhysicalName();
       streamSpecJson.partitionCount = edge.getPartitionCount();
       edgeJson.streamSpec = streamSpecJson;
+
+      List<String> sourceJobs = new ArrayList<>();
+      edge.getSourceNodes().forEach(jobNode -> {
+          sourceJobs.add(jobNode.getJobName());
+        });
+      edgeJson.sourceJobs = sourceJobs;
+
+      List<String> targetJobs = new ArrayList<>();
+      edge.getTargetNodes().forEach(jobNode -> {
+          targetJobs.add(jobNode.getJobName());
+        });
+      edgeJson.targetJobs = targetJobs;
+
       streamEdges.put(streamId, edgeJson);
     }
     return edgeJson;

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index e19c9ca..0484cf9 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 public class JobNode {
   private static final Logger log = LoggerFactory.getLogger(JobNode.class);
   private static final String CONFIG_JOB_PREFIX = "jobs.%s.";
+  private static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan";
 
   private final String jobName;
   private final String jobId;
@@ -92,7 +93,12 @@ public class JobNode {
     return outEdges;
   }
 
-  public JobConfig generateConfig() {
+  /**
+   * Generate the configs for a job
+   * @param executionPlanJson JSON representation of the execution plan
+   * @return config of the job
+   */
+  public JobConfig generateConfig(String executionPlanJson) {
     Map<String, String> configs = new HashMap<>();
     configs.put(JobConfig.JOB_NAME(), jobName);
 
@@ -100,6 +106,8 @@ public class JobNode {
     configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
     log.info("Job {} has generated configs {}", jobName, configs);
 
+    configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
+
     String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
     // TODO: Disallow user specifying job inputs/outputs. This info comes strictly from the pipeline.
     return new JobConfig(Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix)));

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 69a41db..b9adeed 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -81,16 +81,16 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
 
   @Override
   public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
-    OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec(
-        filterFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
+    OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, new MessageStreamImpl<>(this.graph),
+        this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(op);
     return op.getNextStream();
   }
 
   @Override
   public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
-    OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(
-        flatMapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
+    OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, new MessageStreamImpl<>(this.graph),
+        this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(op);
     return op.getNextStream();
   }
@@ -103,15 +103,15 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
 
   @Override
   public <K, V> void sendTo(OutputStream<K, V, M> outputStream) {
-    SinkOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
-        (OutputStreamInternal<K, V, M>) outputStream, this.graph.getNextOpId());
+    SinkOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamInternal<K, V, M>) outputStream,
+        this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(op);
   }
 
   @Override
   public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
-    OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec(
-        (WindowInternal<M, K, WV>) window, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
+    OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
+        new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(wndOp);
     return wndOp.getNextStream();
   }
@@ -175,9 +175,9 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
     this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(
         thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));
 
-    ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs
-        .add(OperatorSpecs.createPartialJoinOperatorSpec(
-            otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));
+    ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs
+        .createPartialJoinOperatorSpec(otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream,
+            this.graph.getNextOpId()));
 
     return nextStream;
   }
@@ -187,8 +187,11 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
     MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph);
 
     otherStreams.add(this);
-    otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs.
-        add(OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId())));
+    otherStreams.forEach(other -> {
+        OperatorSpec mergeOperatorSepc =
+            OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId());
+        ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(mergeOperatorSepc);
+      });
     return nextStream;
   }
 
@@ -213,5 +216,4 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
     return Collections.unmodifiableSet(this.registeredOperatorSpecs);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
index 0f18e97..059b567 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
@@ -62,6 +62,11 @@ public final class RootOperatorImpl<M> extends OperatorImpl<M, M> {
       public int getOpId() {
         return -1;
       }
+
+      @Override
+      public String getSourceLocation() {
+        return "";
+      }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index cc3c4ab..3ea52ca 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -63,6 +63,12 @@ public interface OperatorSpec<OM> {
   int getOpId();
 
   /**
+   * Return the user source code location that creates the operator
+   * @return source location
+   */
+  String getSourceLocation();
+
+  /**
    * Get the name for this operator based on its opCode and opId.
    * @return  the name for this operator
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index e85626f..92b4170 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -20,6 +20,7 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.operators.util.OperatorJsonUtils;
 
 
 /**
@@ -38,6 +39,7 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
   private final long ttlMs;
   private final MessageStreamImpl<RM> nextStream;
   private final int opId;
+  private final String sourceLocation;
 
   /**
    * Default constructor for a {@link PartialJoinOperatorSpec}.
@@ -58,6 +60,7 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
     this.ttlMs = ttlMs;
     this.nextStream = nextStream;
     this.opId = opId;
+    this.sourceLocation = OperatorJsonUtils.getSourceLocation();
   }
 
   @Override
@@ -86,4 +89,9 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
   public int getOpId() {
     return this.opId;
   }
+
+  @Override
+  public String getSourceLocation() {
+    return sourceLocation;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 0d135d3..afdd6b9 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators.spec;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.stream.OutputStreamInternal;
+import org.apache.samza.operators.util.OperatorJsonUtils;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
@@ -39,6 +40,7 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
   private OutputStreamInternal<?, ?, M> outputStream; // may be null
   private final OperatorSpec.OpCode opCode;
   private final int opId;
+  private final String sourceLocation;
 
   /**
    * Constructs a {@link SinkOperatorSpec} with a user defined {@link SinkFunction}.
@@ -54,6 +56,7 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
     this.sinkFn = sinkFn;
     this.opCode = opCode;
     this.opId = opId;
+    this.sourceLocation = OperatorJsonUtils.getSourceLocation();
   }
 
   /**
@@ -99,6 +102,11 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
     return this.opId;
   }
 
+  @Override
+  public String getSourceLocation() {
+    return sourceLocation;
+  }
+
   /**
    * Creates a {@link SinkFunction} to send messages to the provided {@code output}.
    * @param outputStream  the {@link OutputStreamInternal} to send messages to

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index 204e566..c53efae 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -20,6 +20,7 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.util.OperatorJsonUtils;
 
 
 /**
@@ -34,6 +35,7 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
   private final MessageStreamImpl<OM> nextStream;
   private final OperatorSpec.OpCode opCode;
   private final int opId;
+  private final String sourceLocation;
 
   /**
    * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
@@ -49,6 +51,7 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
     this.nextStream = nextStream;
     this.opCode = opCode;
     this.opId = opId;
+    this.sourceLocation = OperatorJsonUtils.getSourceLocation();
   }
 
   @Override
@@ -69,4 +72,9 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
   public int getOpId() {
     return this.opId;
   }
+
+  @Override
+  public String getSourceLocation() {
+    return sourceLocation;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 73b17b5..7ea07f6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -20,6 +20,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.util.OperatorJsonUtils;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 
@@ -36,6 +37,7 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
   private final WindowInternal<M, WK, WV> window;
   private final MessageStreamImpl<WindowPane<WK, WV>> nextStream;
   private final int opId;
+  private final String sourceLocation;
 
   /**
    * Constructor for {@link WindowOperatorSpec}.
@@ -48,6 +50,7 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
     this.nextStream = nextStream;
     this.window = window;
     this.opId = opId;
+    this.sourceLocation = OperatorJsonUtils.getSourceLocation();
   }
 
   @Override
@@ -68,4 +71,9 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
   public int getOpId() {
     return this.opId;
   }
+
+  @Override
+  public String getSourceLocation() {
+    return sourceLocation;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
new file mode 100644
index 0000000..b52fbc3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OperatorJsonUtils {
+  private static final Logger log = LoggerFactory.getLogger(OperatorJsonUtils.class);
+
+  private static final String OP_CODE = "opCode";
+  private static final String OP_ID = "opId";
+  private static final String SOURCE_LOCATION = "sourceLocation";
+  private static final String NEXT_OPERATOR_IDS = "nextOperatorIds";
+  private static final String OUTPUT_STREAM_ID = "outputStreamId";
+  private static final String TTL_MS = "ttlMs";
+
+  /**
+   * Format the operator properties into a map
+   * @param spec a {@link OperatorSpec} instance
+   * @return map of the operator properties
+   */
+  public static Map<String, Object> operatorToMap(OperatorSpec spec) {
+    Map<String, Object> map = new HashMap<>();
+    map.put(OP_CODE, spec.getOpCode().name());
+    map.put(OP_ID, spec.getOpId());
+    map.put(SOURCE_LOCATION, spec.getSourceLocation());
+
+    if (spec.getNextStream() != null) {
+      Collection<OperatorSpec> nextOperators = spec.getNextStream().getRegisteredOperatorSpecs();
+      map.put(NEXT_OPERATOR_IDS, nextOperators.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet()));
+    } else {
+      map.put(NEXT_OPERATOR_IDS, Collections.emptySet());
+    }
+
+    if (spec instanceof SinkOperatorSpec) {
+      map.put(OUTPUT_STREAM_ID, ((SinkOperatorSpec) spec).getOutputStream().getStreamSpec().getId());
+    }
+
+    if (spec instanceof PartialJoinOperatorSpec) {
+      map.put(TTL_MS, ((PartialJoinOperatorSpec) spec).getTtlMs());
+    }
+
+    return map;
+  }
+
+  /**
+   * Return the location of source code that creates the operator.
+   * This function is invoked in the constructor of each operator.
+   * @return formatted source location including file and line number
+   */
+  public static String getSourceLocation() {
+    // The stack trace looks like:
+    // [0] Thread.getStackTrace()
+    // [1] OperatorJsonUtils.getSourceLocation()
+    // [2] SomeOperator.<init>()
+    // [3] OperatorSpecs.createSomeOperator()
+    // [4] MessageStreamImpl.someOperator()
+    // [5] User code that calls [2]
+    // we are only interested in [5] here
+    StackTraceElement location = Thread.currentThread().getStackTrace()[5];
+    return String.format("%s:%s", location.getFileName(), location.getLineNumber());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index 692dc38..3c7c83d 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -18,22 +18,28 @@
  */
 package org.apache.samza.runtime;
 
+import java.io.File;
+import java.io.PrintWriter;
 import java.util.Map;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.execution.ExecutionPlanner;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Defines common, core behavior for implementations of the {@link ApplicationRunner} API
  */
 public abstract class AbstractApplicationRunner extends ApplicationRunner {
+  private static final Logger log = LoggerFactory.getLogger(AbstractApplicationRunner.class);
 
   private final StreamManager streamManager;
   private final ExecutionPlanner planner;
@@ -106,4 +112,25 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
   final StreamManager getStreamManager() {
     return streamManager;
   }
+
+  /**
+   * Write the execution plan JSON to a file
+   * @param planJson JSON representation of the plan
+   */
+  final void writePlanJsonFile(String planJson) {
+    try {
+      String content = "plan='" + planJson + "'";
+      String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR());
+      if (planPath != null && !planPath.isEmpty()) {
+        // Write the plan json to plan path
+        File file = new File(planPath + "/plan.json");
+        file.setReadable(true, false);
+        PrintWriter writer = new PrintWriter(file, "UTF-8");
+        writer.println(content);
+        writer.close();
+      }
+    } catch (Exception e) {
+      log.warn("Failed to write execution plan json to file", e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 5e83c3c..bff0f1c 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -110,6 +110,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     try {
       // 1. initialize and plan
       ExecutionPlan plan = getExecutionPlan(app);
+      writePlanJsonFile(plan.getPlanAsJson());
 
       // 2. create the necessary streams
       createStreams(plan.getIntermediateStreams());

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 38eb195..d5f6e21 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -50,6 +50,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
     try {
       // 1. initialize and plan
       ExecutionPlan plan = getExecutionPlan(app);
+      writePlanJsonFile(plan.getPlanAsJson());
 
       // 2. create the necessary streams
       getStreamManager().createStreams(plan.getIntermediateStreams());

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 1397ed5..3c0f320 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -53,6 +53,11 @@ object ShellCommandConfig {
    */
   val ENV_LOGGED_STORE_BASE_DIR = "LOGGED_STORE_BASE_DIR"
 
+  /**
+   * The directory path that contains the execution plan
+   */
+  val EXECUTION_PLAN_DIR = "EXECUTION_PLAN_DIR"
+
   val COMMAND_SHELL_EXECUTE = "task.execute"
   val TASK_JVM_OPTS = "task.opts"
   val TASK_JAVA_HOME = "task.java.home"

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index b7f952a..5366dc3 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -220,11 +220,11 @@ public class TestExecutionPlanner {
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
-    assertTrue(jobGraph.getOrCreateEdge(input1).getPartitionCount() == 64);
-    assertTrue(jobGraph.getOrCreateEdge(input2).getPartitionCount() == 16);
-    assertTrue(jobGraph.getOrCreateEdge(input3).getPartitionCount() == 32);
-    assertTrue(jobGraph.getOrCreateEdge(output1).getPartitionCount() == 8);
-    assertTrue(jobGraph.getOrCreateEdge(output2).getPartitionCount() == 16);
+    assertTrue(jobGraph.getOrCreateStreamEdge(input1).getPartitionCount() == 64);
+    assertTrue(jobGraph.getOrCreateStreamEdge(input2).getPartitionCount() == 16);
+    assertTrue(jobGraph.getOrCreateStreamEdge(input3).getPartitionCount() == 32);
+    assertTrue(jobGraph.getOrCreateStreamEdge(output1).getPartitionCount() == 8);
+    assertTrue(jobGraph.getOrCreateStreamEdge(output2).getPartitionCount() == 16);
 
     jobGraph.getIntermediateStreamEdges().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == -1);
@@ -264,11 +264,10 @@ public class TestExecutionPlanner {
   }
 
   @Test
-  public void testCalculateIntStreamPartitions() {
+  public void testCalculateIntStreamPartitions() throws Exception {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraphImpl streamGraph = createSimpleGraph();
-    JobGraph jobGraph = planner.createJobGraph(streamGraph);
-    planner.calculatePartitions(streamGraph, jobGraph);
+    JobGraph jobGraph = (JobGraph) planner.plan(streamGraph);
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
index 4a4498c..bf131ce 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -59,14 +59,14 @@ public class TestJobGraph {
   private void createGraph1() {
     graph1 = new JobGraph(null);
 
-    JobNode n2 = graph1.getOrCreateNode("2", "1", null);
-    JobNode n3 = graph1.getOrCreateNode("3", "1", null);
-    JobNode n5 = graph1.getOrCreateNode("5", "1", null);
-    JobNode n7 = graph1.getOrCreateNode("7", "1", null);
-    JobNode n8 = graph1.getOrCreateNode("8", "1", null);
-    JobNode n9 = graph1.getOrCreateNode("9", "1", null);
-    JobNode n10 = graph1.getOrCreateNode("10", "1", null);
-    JobNode n11 = graph1.getOrCreateNode("11", "1", null);
+    JobNode n2 = graph1.getOrCreateJobNode("2", "1", null);
+    JobNode n3 = graph1.getOrCreateJobNode("3", "1", null);
+    JobNode n5 = graph1.getOrCreateJobNode("5", "1", null);
+    JobNode n7 = graph1.getOrCreateJobNode("7", "1", null);
+    JobNode n8 = graph1.getOrCreateJobNode("8", "1", null);
+    JobNode n9 = graph1.getOrCreateJobNode("9", "1", null);
+    JobNode n10 = graph1.getOrCreateJobNode("10", "1", null);
+    JobNode n11 = graph1.getOrCreateJobNode("11", "1", null);
 
     graph1.addSource(genStream(), n5);
     graph1.addSource(genStream(), n7);
@@ -92,13 +92,13 @@ public class TestJobGraph {
   private void createGraph2() {
     graph2 = new JobGraph(null);
 
-    JobNode n1 = graph2.getOrCreateNode("1", "1", null);
-    JobNode n2 = graph2.getOrCreateNode("2", "1", null);
-    JobNode n3 = graph2.getOrCreateNode("3", "1", null);
-    JobNode n4 = graph2.getOrCreateNode("4", "1", null);
-    JobNode n5 = graph2.getOrCreateNode("5", "1", null);
-    JobNode n6 = graph2.getOrCreateNode("6", "1", null);
-    JobNode n7 = graph2.getOrCreateNode("7", "1", null);
+    JobNode n1 = graph2.getOrCreateJobNode("1", "1", null);
+    JobNode n2 = graph2.getOrCreateJobNode("2", "1", null);
+    JobNode n3 = graph2.getOrCreateJobNode("3", "1", null);
+    JobNode n4 = graph2.getOrCreateJobNode("4", "1", null);
+    JobNode n5 = graph2.getOrCreateJobNode("5", "1", null);
+    JobNode n6 = graph2.getOrCreateJobNode("6", "1", null);
+    JobNode n7 = graph2.getOrCreateJobNode("7", "1", null);
 
     graph2.addSource(genStream(), n1);
     graph2.addIntermediateStream(genStream(), n1, n2);
@@ -119,8 +119,8 @@ public class TestJobGraph {
   private void createGraph3() {
     graph3 = new JobGraph(null);
 
-    JobNode n1 = graph3.getOrCreateNode("1", "1", null);
-    JobNode n2 = graph3.getOrCreateNode("2", "1", null);
+    JobNode n1 = graph3.getOrCreateJobNode("1", "1", null);
+    JobNode n2 = graph3.getOrCreateJobNode("2", "1", null);
 
     graph3.addSource(genStream(), n1);
     graph3.addIntermediateStream(genStream(), n1, n1);
@@ -135,7 +135,7 @@ public class TestJobGraph {
   private void createGraph4() {
     graph4 = new JobGraph(null);
 
-    JobNode n1 = graph4.getOrCreateNode("1", "1", null);
+    JobNode n1 = graph4.getOrCreateJobNode("1", "1", null);
 
     graph4.addSource(genStream(), n1);
     graph4.addIntermediateStream(genStream(), n1, n1);
@@ -160,9 +160,9 @@ public class TestJobGraph {
      * s3 -> 2
      *   |-> 3
      */
-    JobNode n1 = graph.getOrCreateNode("1", "1", null);
-    JobNode n2 = graph.getOrCreateNode("2", "1", null);
-    JobNode n3 = graph.getOrCreateNode("3", "1", null);
+    JobNode n1 = graph.getOrCreateJobNode("1", "1", null);
+    JobNode n2 = graph.getOrCreateJobNode("2", "1", null);
+    JobNode n3 = graph.getOrCreateJobNode("3", "1", null);
     StreamSpec s1 = genStream();
     StreamSpec s2 = genStream();
     StreamSpec s3 = genStream();
@@ -173,16 +173,16 @@ public class TestJobGraph {
 
     assertTrue(graph.getSources().size() == 3);
 
-    assertTrue(graph.getOrCreateNode("1", "1", null).getInEdges().size() == 2);
-    assertTrue(graph.getOrCreateNode("2", "1", null).getInEdges().size() == 1);
-    assertTrue(graph.getOrCreateNode("3", "1", null).getInEdges().size() == 1);
+    assertTrue(graph.getOrCreateJobNode("1", "1", null).getInEdges().size() == 2);
+    assertTrue(graph.getOrCreateJobNode("2", "1", null).getInEdges().size() == 1);
+    assertTrue(graph.getOrCreateJobNode("3", "1", null).getInEdges().size() == 1);
 
-    assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 2);
+    assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 0);
+    assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 1);
+    assertTrue(graph.getOrCreateStreamEdge(s2).getSourceNodes().size() == 0);
+    assertTrue(graph.getOrCreateStreamEdge(s2).getTargetNodes().size() == 1);
+    assertTrue(graph.getOrCreateStreamEdge(s3).getSourceNodes().size() == 0);
+    assertTrue(graph.getOrCreateStreamEdge(s3).getTargetNodes().size() == 2);
   }
 
   @Test
@@ -193,8 +193,8 @@ public class TestJobGraph {
      * 2 -> s3
      */
     JobGraph graph = new JobGraph(null);
-    JobNode n1 = graph.getOrCreateNode("1", "1", null);
-    JobNode n2 = graph.getOrCreateNode("2", "1", null);
+    JobNode n1 = graph.getOrCreateJobNode("1", "1", null);
+    JobNode n2 = graph.getOrCreateJobNode("2", "1", null);
     StreamSpec s1 = genStream();
     StreamSpec s2 = genStream();
     StreamSpec s3 = genStream();
@@ -203,15 +203,15 @@ public class TestJobGraph {
     graph.addSink(s3, n2);
 
     assertTrue(graph.getSinks().size() == 3);
-    assertTrue(graph.getOrCreateNode("1", "1", null).getOutEdges().size() == 1);
-    assertTrue(graph.getOrCreateNode("2", "1", null).getOutEdges().size() == 2);
-
-    assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 0);
+    assertTrue(graph.getOrCreateJobNode("1", "1", null).getOutEdges().size() == 1);
+    assertTrue(graph.getOrCreateJobNode("2", "1", null).getOutEdges().size() == 2);
+
+    assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 1);
+    assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 0);
+    assertTrue(graph.getOrCreateStreamEdge(s2).getSourceNodes().size() == 1);
+    assertTrue(graph.getOrCreateStreamEdge(s2).getTargetNodes().size() == 0);
+    assertTrue(graph.getOrCreateStreamEdge(s3).getSourceNodes().size() == 1);
+    assertTrue(graph.getOrCreateStreamEdge(s3).getTargetNodes().size() == 0);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index c4ab922..2681f9c 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -125,6 +125,8 @@ public class TestJobGraphJsonGenerator {
     JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class);
     assertTrue(nodes.jobs.get(0).operatorGraph.inputStreams.size() == 5);
     assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 12);
-    assertTrue(nodes.streams.size() == 7);
+    assertTrue(nodes.sourceStreams.size() == 3);
+    assertTrue(nodes.sinkStreams.size() == 2);
+    assertTrue(nodes.intermediateStreams.size() == 2);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index bd18f0b..99bf854 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -217,6 +217,11 @@ public class TestOperatorImpl {
     public int getOpId() {
       return -1;
     }
+
+    @Override
+    public String getSourceLocation() {
+      return "";
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index d227206..cccafaf 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -57,8 +57,8 @@ public class TestOperatorSpecs {
   @Test
   public void testCreateStreamOperator() {
     FlatMapFunction<Object, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
-          this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L));
-        } };
+        this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L));
+      } };
     MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
     StreamOperatorSpec<Object, TestMessageEnvelope> streamOp =
         OperatorSpecs.createStreamOperatorSpec(transformFn, mockOutput, 1);
@@ -78,7 +78,7 @@ public class TestOperatorSpecs {
   public void testCreateSinkOperator() {
     SystemStream testStream = new SystemStream("test-sys", "test-stream");
     SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
-          TaskCoordinator taskCoordinator) -> {
+        TaskCoordinator taskCoordinator) -> {
       messageCollector.send(new OutgoingMessageEnvelope(testStream, message.getKey(), message.getMessage()));
     };
     SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, 1);

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-shell/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/assembly/src.xml b/samza-shell/src/main/assembly/src.xml
index 5173fdf..cc15420 100644
--- a/samza-shell/src/main/assembly/src.xml
+++ b/samza-shell/src/main/assembly/src.xml
@@ -27,5 +27,13 @@
         <include>*</include>
       </includes>
     </fileSet>
+    <fileSet>
+      <outputDirectory>visualizer</outputDirectory>
+      <directory>${basedir}/src/main/visualizer</directory>
+      <fileMode>0644</fileMode>
+      <includes>
+        <include>*</include>
+      </includes>
+    </fileSet>
   </fileSets>
 </assembly>

http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-shell/src/main/bash/run-app.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-app.sh b/samza-shell/src/main/bash/run-app.sh
index 3e43463..3880e3c 100644
--- a/samza-shell/src/main/bash/run-app.sh
+++ b/samza-shell/src/main/bash/run-app.sh
@@ -16,6 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
 [[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
 
 exec $(dirname $0)/run-class.sh org.apache.samza.runtime.ApplicationRunnerMain "$@"