You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/03 18:03:45 UTC

samza git commit: Support topic partition generation of partitionBy and join

Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 86eb10f2f -> a83c69a25


Support topic partition generation of partitionBy and join


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a83c69a2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a83c69a2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a83c69a2

Branch: refs/heads/samza-fluent-api-v1
Commit: a83c69a25d3b1fa67cac2a9113c57e52d86d4f1d
Parents: 86eb10f
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Thu Mar 2 18:42:23 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Fri Mar 3 10:03:30 2017 -0800

----------------------------------------------------------------------
 .../samza/operators/MessageStreamImpl.java      |   6 +-
 .../apache/samza/operators/StreamGraphImpl.java |  30 +-
 .../samza/operators/spec/OperatorSpecs.java     |   4 +-
 .../samza/processorgraph/ExecutionPlanner.java  | 213 ++++++++++----
 .../samza/processorgraph/ProcessorGraph.java    |  61 ++--
 .../samza/processorgraph/ProcessorNode.java     |  36 +--
 .../apache/samza/processorgraph/StreamEdge.java |  15 -
 .../system/AbstractExecutionEnvironment.java    |   5 +-
 .../system/RemoteExecutionEnvironment.java      |   2 +-
 .../system/StandaloneExecutionEnvironment.java  |   2 +-
 .../apache/samza/task/StreamOperatorTask.java   |   6 +-
 .../apache/samza/util/ConfigInheritence.java    |   6 +-
 .../org/apache/samza/config/JobConfig.scala     |   1 +
 .../processorgraph/TestExecutionPlanner.java    | 281 +++++++++++++++++++
 14 files changed, 525 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 830e4a5..77cb3fc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -163,10 +163,12 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
 
   @Override
   public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) {
-    MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor);
+    int opId = graph.getNextOpId();
+    String streamId = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name(), opId);
+    MessageStreamImpl<M> intStream = this.graph.createIntStream(streamId, parKeyExtractor);
     OutputStream<M> outputStream = this.graph.getOutputStream(intStream);
     this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(),
-        this.graph, outputStream));
+        this.graph, outputStream, opId));
     return intStream;
   }
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index b965d6a..f3fd176 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,20 +18,20 @@
  */
 package org.apache.samza.operators;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.function.Function;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.ExecutionEnvironment;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to
  * create system input/output/intermediate streams.
@@ -129,9 +129,14 @@ public class StreamGraphImpl implements StreamGraph {
    */
   private final Map<String, MessageStream> inStreams = new HashMap<>();
   private final Map<String, OutputStream> outStreams = new HashMap<>();
+  private final ExecutionEnvironment executionEnvironment;
 
   private ContextManager contextManager = new ContextManager() { };
 
+  public StreamGraphImpl(ExecutionEnvironment executionEnvironment) {
+    this.executionEnvironment = executionEnvironment;
+  }
+
   @Override
   public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
     if (!this.inStreams.containsKey(streamSpec.getId())) {
@@ -182,7 +187,12 @@ public class StreamGraphImpl implements StreamGraph {
 
   @Override public Map<StreamSpec, OutputStream> getOutStreams() {
     Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>();
-    this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry));
+    this.outStreams.forEach((ss, entry) -> {
+        StreamSpec streamSpec = (entry instanceof IntermediateStreamImpl) ?
+          ((IntermediateStreamImpl) entry).getSpec() :
+          ((OutputStreamImpl) entry).getSpec();
+        outStreamMap.put(streamSpec, entry);
+      });
     return Collections.unmodifiableMap(outStreamMap);
   }
 
@@ -231,9 +241,9 @@ public class StreamGraphImpl implements StreamGraph {
    * @param <M>  the type of input message
    * @return  the {@link OutputStream} object for the re-partitioned stream
    */
-  <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
+  <PK, M> MessageStreamImpl<M> createIntStream(String streamId, Function<M, PK> parKeyFn) {
     // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
-    StreamSpec streamSpec = this.createIntStreamSpec();
+    StreamSpec streamSpec = executionEnvironment.streamFromConfig(streamId);
 
     if (!this.inStreams.containsKey(streamSpec.getId())) {
       this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
@@ -244,10 +254,4 @@ public class StreamGraphImpl implements StreamGraph {
     }
     return intStream;
   }
-
-  private StreamSpec createIntStreamSpec() {
-    // TODO: placeholder to generate the intermediate stream's {@link StreamSpec} automatically
-    return null;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index d626852..7b14b9c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -152,8 +152,8 @@ public class OperatorSpecs {
    * @param <M>  type of input message
    * @return  the {@link SinkOperatorSpec}
    */
-  public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
-    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream);
+  public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream, int opId) {
+    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, opId, stream);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
index ddf2ca7..757034e 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -24,15 +24,20 @@ import com.google.common.collect.Multimap;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
@@ -43,6 +48,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * The ExecutionPlanner creates the physical execution graph for the StreamGraph, and
+ * the intermediate topics needed for the execution.
+ */
 public class ExecutionPlanner {
   private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
 
@@ -56,27 +65,28 @@ public class ExecutionPlanner {
     Map<String, SystemAdmin> sysAdmins = getSystemAdmins(config);
 
     // create physical processors based on stream graph
-    ProcessorGraph processorGraph = splitStages(streamGraph);
+    ProcessorGraph processorGraph = createProcessorGraph(streamGraph);
 
     if (!processorGraph.getInternalStreams().isEmpty()) {
-      // figure out the partition for internal streams
-      Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins);
+      // figure out the partitions for internal streams
+      calculatePartitions(streamGraph, processorGraph, sysAdmins);
 
       // create the streams
-      createStreams(streams, sysAdmins);
+      createStreams(processorGraph, sysAdmins);
     }
 
     return processorGraph;
   }
 
-  public ProcessorGraph splitStages(StreamGraph streamGraph) throws Exception {
-    String pipelineId = String.format("%s-%s", config.get(JobConfig.JOB_NAME()), config.getOrDefault(JobConfig.JOB_ID(), "1"));
-    // For this phase, we are going to create a processor with the whole dag
-    String processorId = pipelineId; // only one processor, name it the same as pipeline itself
+  /**
+   * Create the physical graph from StreamGraph
+   * Package private for testing
+   */
+  ProcessorGraph createProcessorGraph(StreamGraph streamGraph) {
+    // For this phase, we are going to create a processor for the whole dag
+    String processorId = config.get(JobConfig.JOB_NAME()); // only one processor, use the job name
 
     ProcessorGraph processorGraph = new ProcessorGraph(config);
-
-    // TODO: remote the casting once we have the correct types in StreamGraph
     Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
     Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
@@ -98,49 +108,32 @@ public class ExecutionPlanner {
     return processorGraph;
   }
 
-  private Multimap<String, StreamSpec> calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+  /**
+   * Figure out the number of partitions of intermediate streams
+   * Package private for testing
+   */
+  void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
     // fetch the external streams partition info
-    getExistingStreamPartitions(processorGraph, sysAdmins);
-
-    // use BFS to figure out the join partition count
-
-
-    // TODO this algorithm assumes only one processor, and it does not consider join
-    Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create();
-    List<ProcessorNode> processors = processorGraph.topologicalSort();
-    processors.forEach(processor -> {
-        Set<StreamEdge> outStreams = new HashSet<>(processor.getOutEdges());
-        outStreams.retainAll(processorGraph.getInternalStreams());
-        if (!outStreams.isEmpty()) {
-          int maxInPartition = maxPartition(processor.getInEdges());
-          int maxOutPartition = maxPartition(processor.getOutEdges());
-          int partition = Math.max(maxInPartition, maxOutPartition);
-
-          outStreams.forEach(streamEdge -> {
-              if (streamEdge.getPartitions() == -1) {
-                streamEdge.setPartitions(partition);
-                StreamSpec streamSpec = createStreamSpec(streamEdge);
-                streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec);
-              }
-            });
-        }
-      });
+    fetchExistingStreamPartitions(processorGraph, sysAdmins);
 
-    return streamsGroupedBySystem;
+    // calculate the partitions for the input streams of join operators
+    calculateJoinInputPartitions(streamGraph, processorGraph);
+
+    // calculate the partitions for the rest of intermediate streams
+    calculateIntStreamPartitions(processorGraph, config);
   }
 
-  private void getExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
-    Set<StreamEdge> allStreams = new HashSet<>();
-    allStreams.addAll(processorGraph.getSources());
-    allStreams.addAll(processorGraph.getSinks());
-    allStreams.addAll(processorGraph.getInternalStreams());
+  static void fetchExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+    Set<StreamEdge> existingStreams = new HashSet<>();
+    existingStreams.addAll(processorGraph.getSources());
+    existingStreams.addAll(processorGraph.getSinks());
 
-    Multimap<String, StreamEdge> externalStreamsMap = HashMultimap.create();
-    allStreams.forEach(streamEdge -> {
+    Multimap<String, StreamEdge> existingStreamsMap = HashMultimap.create();
+    existingStreams.forEach(streamEdge -> {
         SystemStream systemStream = streamEdge.getSystemStream();
-        externalStreamsMap.put(systemStream.getSystem(), streamEdge);
+        existingStreamsMap.put(systemStream.getSystem(), streamEdge);
       });
-    for (Map.Entry<String, Collection<StreamEdge>> entry : externalStreamsMap.asMap().entrySet()) {
+    for (Map.Entry<String, Collection<StreamEdge>> entry : existingStreamsMap.asMap().entrySet()) {
       String systemName = entry.getKey();
       Collection<StreamEdge> streamEdges = entry.getValue();
       Map<String, StreamEdge> streamToEdge = new HashMap<>();
@@ -150,18 +143,136 @@ public class ExecutionPlanner {
       metadata.forEach((stream, data) -> {
           int partitions = data.getSystemStreamPartitionMetadata().size();
           streamToEdge.get(stream).setPartitions(partitions);
-          log.info("Partition count is {} for stream {}", partitions, stream);
+          log.debug("Partition count is {} for stream {}", partitions, stream);
         });
     }
   }
 
-  private void createStreams(Multimap<String, StreamSpec> streams, Map<String, SystemAdmin> sysAdmins) {
-    for (Map.Entry<String, Collection<StreamSpec>> entry : streams.asMap().entrySet()) {
+  /**
+   * Calculate the partitions for the input streams of join operators
+   * Package private for testing
+   */
+  static void calculateJoinInputPartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
+    // get join operators with input streams
+    Multimap<OperatorSpec, StreamEdge> joinToStreamMap = HashMultimap.create();
+    Multimap<StreamEdge, OperatorSpec> streamToJoinMap = HashMultimap.create();
+    Map<MessageStream, OperatorSpec> outputToJoinMap = new HashMap<>();
+    Queue<OperatorSpec> joinQ = new LinkedList<>(); // a queue of joins with known input partitions
+    Set<OperatorSpec> visited = new HashSet<>();
+    streamGraph.getInStreams().entrySet().forEach(entry -> {
+        StreamEdge streamEdge = processorGraph.getEdge(entry.getKey());
+        getJoins(entry.getValue(), streamEdge, joinToStreamMap, streamToJoinMap, outputToJoinMap, joinQ, visited);
+      });
+    // calculate join input partition count
+    while (!joinQ.isEmpty()) {
+      OperatorSpec join = joinQ.poll();
+      int partitions = -1;
+      // loop through the input streams to the join and find the partition count
+      for (StreamEdge edge : joinToStreamMap.get(join)) {
+        int edgePartitions = edge.getPartitions();
+        if (edgePartitions != -1) {
+          if (partitions == -1) {
+            //if the partition is not assigned
+            partitions = edgePartitions;
+          } else if (partitions != edgePartitions) {
+            throw  new SamzaException(String.format("Unable to resolve input partitions of stream %s for join",
+                edge.getSystemStream().toString()));
+          }
+        }
+      }
+      // assign the partition count
+      for (StreamEdge edge : joinToStreamMap.get(join)) {
+        if (edge.getPartitions() <= 0) {
+          edge.setPartitions(partitions);
+
+          // find other joins can be inferred by setting this edge
+          for (OperatorSpec op : streamToJoinMap.get(edge)) {
+            if (!visited.contains(op)) {
+              joinQ.add(op);
+              visited.add(op);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * This function
+   * @param messageStream
+   * @param streamEdge
+   * @param joinToStreamMap
+   * @param streamToJoinMap
+   * @param outputToJoinMap
+   * @param joinQ
+   * @param visited
+   */
+  static void getJoins(MessageStream messageStream,
+      StreamEdge streamEdge,
+      Multimap<OperatorSpec, StreamEdge> joinToStreamMap,
+      Multimap<StreamEdge, OperatorSpec> streamToJoinMap,
+      Map<MessageStream, OperatorSpec> outputToJoinMap,
+      Queue<OperatorSpec> joinQ,
+      Set<OperatorSpec> visited) {
+    Collection<OperatorSpec> specs = ((MessageStreamImpl) messageStream).getRegisteredOperatorSpecs();
+    for (OperatorSpec spec : specs) {
+      if (spec instanceof PartialJoinOperatorSpec) {
+        // every join will have two partial join operators
+        // we will choose one of them in order to consolidate the inputs
+        // the first one who registered with the outputToJoinMap will win
+        MessageStream output = spec.getNextStream();
+        OperatorSpec joinSpec = outputToJoinMap.get(output);
+        if (joinSpec == null) {
+          joinSpec = spec;
+          outputToJoinMap.put(output, joinSpec);
+        }
+
+        joinToStreamMap.put(joinSpec, streamEdge);
+        streamToJoinMap.put(streamEdge, joinSpec);
+
+        if (!visited.contains(joinSpec) && streamEdge.getPartitions() > 0) {
+          // put the joins with known input partitions into the queue
+          joinQ.add(joinSpec);
+          visited.add(joinSpec);
+        }
+      }
+
+      if (spec.getNextStream() != null) {
+        getJoins(spec.getNextStream(), streamEdge, joinToStreamMap, streamToJoinMap, outputToJoinMap, joinQ, visited);
+      }
+    }
+  }
+
+  static void calculateIntStreamPartitions(ProcessorGraph processorGraph, Config config) {
+    int partitions = config.getInt(JobConfig.JOB_DEFAULT_PARTITIONS(), -1);
+    if (partitions < 0) {
+      // use the following simple algo to figure out the partitions
+      // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
+      int maxInPartitions = maxPartition(processorGraph.getSources());
+      int maxOutPartitions = maxPartition(processorGraph.getSinks());
+      partitions = Math.max(maxInPartitions, maxOutPartitions);
+    }
+    for (StreamEdge edge : processorGraph.getInternalStreams()) {
+      if (edge.getPartitions() <= 0) {
+        edge.setPartitions(partitions);
+      }
+    }
+  }
+
+  private static void createStreams(ProcessorGraph graph, Map<String, SystemAdmin> sysAdmins) {
+    Multimap<String, StreamSpec> streamsToCreate = HashMultimap.create();
+    graph.getInternalStreams().forEach(edge -> {
+        StreamSpec streamSpec = createStreamSpec(edge);
+        streamsToCreate.put(edge.getSystemStream().getSystem(), streamSpec);
+      });
+
+    for (Map.Entry<String, Collection<StreamSpec>> entry : streamsToCreate.asMap().entrySet()) {
       String systemName = entry.getKey();
       SystemAdmin systemAdmin = sysAdmins.get(systemName);
 
       for (StreamSpec stream : entry.getValue()) {
-        log.info("Creating stream {} on system {}", stream.getPhysicalName(), systemName);
+        log.info("Creating stream {} with partitions {} on system {}",
+            new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName});
         systemAdmin.createStream(stream);
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
index d4ad84b..5ee4d29 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
@@ -37,12 +37,9 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * The ProcessorGraph represents the multi-stage Samza processors of a pipeline on the physical execution layer.
- * High level APIs are transformed into ProcessorGraph for future plan, validation and execution.
- *
- * <p>The ProcessorGraph is a graph of source/sink/intermediate streams and processors are connected together. Each
- * ProcessorNode contains the config which is required to run the processor.
- *
+ * The ProcessorGraph is the physical execution graph for a multi-stage Samza application.
+ * It contains the topology of execution processors connected by source/sink/intermediate streams.
+ * High level APIs are transformed into ProcessorGraph for planing, validation and execution.
  */
 public class ProcessorGraph {
   private static final Logger log = LoggerFactory.getLogger(ProcessorGraph.class);
@@ -64,8 +61,6 @@ public class ProcessorGraph {
     edge.addTargetNode(node);
     node.addInEdge(edge);
     sources.add(edge);
-
-    log.info(edge.toString());
   }
 
   void addSink(StreamSpec output, String sourceProcessorId) {
@@ -74,8 +69,6 @@ public class ProcessorGraph {
     edge.addSourceNode(node);
     node.addOutEdge(edge);
     sinks.add(edge);
-
-    log.info(edge.toString());
   }
 
   void addEdge(StreamSpec streamSpec, String sourceProcessorId, String targetProcessorId) {
@@ -87,8 +80,6 @@ public class ProcessorGraph {
     sourceNode.addOutEdge(edge);
     targetNode.addInEdge(edge);
     internalStreams.add(edge);
-
-    log.info(edge.toString());
   }
 
   ProcessorNode getNode(String processorId) {
@@ -242,18 +233,22 @@ public class ProcessorGraph {
     Collection<ProcessorNode> pnodes = nodes.values();
     Queue<ProcessorNode> q = new ArrayDeque<>();
     Map<String, Long> indegree = new HashMap<>();
+    Set<ProcessorNode> visited = new HashSet<>();
     pnodes.forEach(node -> {
         String nid = node.getId();
+        //only count the degrees of intermediate streams since sources have degree 0
         long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
         indegree.put(nid, degree);
 
         if (degree == 0L) {
+          // start from the nodes that only consume from sources
           q.add(node);
+          visited.add(node);
         }
       });
 
     List<ProcessorNode> sortedNodes = new ArrayList<>();
-    Set<ProcessorNode> visited = new HashSet<>();
+    Set<ProcessorNode> reachable = new HashSet<>();
     while (sortedNodes.size() < pnodes.size()) {
       while (!q.isEmpty()) {
         ProcessorNode node = q.poll();
@@ -264,31 +259,41 @@ public class ProcessorGraph {
             indegree.put(nid, degree);
             if (degree == 0L && !visited.contains(n)) {
               q.add(n);
+              visited.add(n);
             }
-            visited.add(n);
+            reachable.add(n);
           });
       }
 
       if (sortedNodes.size() < pnodes.size()) {
         // The remaining nodes have circles
-        // use the following simple approach to break the circles
-        // start from the node that have been seen
-        visited.removeAll(sortedNodes);
-        //find out the nodes with minimal input edge
-        long min = Long.MAX_VALUE;
-        ProcessorNode minNode = null;
-        for (ProcessorNode node : visited) {
-          Long degree = indegree.get(node.getId());
-          if (degree < min) {
-            min = degree;
-            minNode = node;
+        // use the following approach to break the circles
+        // start from the nodes that are reachable from previous traverse
+        reachable.removeAll(sortedNodes);
+        if (!reachable.isEmpty()) {
+          //find out the nodes with minimal input edge
+          long min = Long.MAX_VALUE;
+          ProcessorNode minNode = null;
+          for (ProcessorNode node : reachable) {
+            Long degree = indegree.get(node.getId());
+            if (degree < min) {
+              min = degree;
+              minNode = node;
+            }
           }
+          // start from the node with minimal input edge again
+          q.add(minNode);
+        } else {
+          // all the remaining nodes should be reachable from sources
+          // start from sources again to find the next node that hasn't been visited
+          ProcessorNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
+              .filter(node -> !visited.contains(node))
+              .findAny().get();
+          q.add(nextNode);
         }
-        // start from the node with minimal input edge again
-        q.add(minNode);
       }
     }
 
     return sortedNodes;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
index 0b02377..08d94b4 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.util.ConfigInheritence;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
@@ -35,8 +36,9 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * The ProcessorNode represents a Samza processor.
- * It contains the input/output, and the config to run the processor.
+ * A ProcessorNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
+ * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
+ * A ProcessorNode contains the input/output, and the configs for physical execution.
  */
 public class ProcessorNode {
   private static final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
@@ -73,31 +75,15 @@ public class ProcessorNode {
   }
 
   public Config generateConfig() {
-    String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
-    // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
-    return Util.rewriteConfig(ConfigInheritence.extractScopedConfig(config, generateProcessorConfig(), configPrefix));
-  }
-
-  private Config generateProcessorConfig() {
     Map<String, String> configs = new HashMap<>();
-    List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
-
-    // TODO temp logs for debugging
-    log.info("Processor {} has formatted inputs {}", id, inputs);
-
-    // TODO hack alert: hard coded string literals!
-    configs.put("task.inputs", Joiner.on(',').join(inputs));
-
-    // TODO: DISCUSS how does the processor know it's output names?
-    outEdges.forEach(edge -> {
-        if (!edge.getName().isEmpty()) {
-          configs.put(String.format("task.outputs.%s.stream", edge.getName()), edge.getFormattedSystemStream());
-        }
-      });
-
     configs.put(JobConfig.JOB_NAME(), id);
 
+    List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
+    configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
     log.info("Processor {} has generated configs {}", id, configs);
-    return new MapConfig(configs);
+
+    String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
+    // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
+    return Util.rewriteConfig(ConfigInheritence.extractScopedConfig(config, new MapConfig(configs), configPrefix));
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
index 879d705..664a458 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
@@ -19,10 +19,8 @@
 
 package org.apache.samza.processorgraph;
 
-import com.google.common.base.Joiner;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
@@ -92,17 +90,4 @@ public class StreamEdge {
   void setName(String name) {
     this.name = name;
   }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder("StreamEdge ");
-    builder.append(getSystemStream().toString()).append(": (");
-    List<String> sourceIds = sourceNodes.stream().map(node -> node.getId()).collect(Collectors.toList());
-    String sources = Joiner.on(',').join(sourceIds);
-    builder.append(sources).append(") -> (");
-    List<String> targetIds = targetNodes.stream().map(node -> node.getId()).collect(Collectors.toList());
-    String targets = Joiner.on(',').join(targetIds);
-    builder.append(targets).append(")");
-    return builder.toString();
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
index c066bdd..dabe651 100644
--- a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
@@ -20,12 +20,14 @@ package org.apache.samza.system;
 
 import java.util.Map;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StreamConfig;
 
 
 public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment {
 
   private final Config config;
+  private final String streamPrefix;
 
   public AbstractExecutionEnvironment(Config config) {
     if (config == null) {
@@ -33,6 +35,7 @@ public abstract class AbstractExecutionEnvironment implements ExecutionEnvironme
     }
 
     this.config = config;
+    this.streamPrefix = String.format("%s-%s", config.get(JobConfig.JOB_NAME()), config.get(JobConfig.JOB_ID(), "1"));
   }
 
   @Override
@@ -40,7 +43,7 @@ public abstract class AbstractExecutionEnvironment implements ExecutionEnvironme
     StreamConfig streamConfig = new StreamConfig(config);
 
     String system = streamConfig.getSystem(streamId);
-    String physicalName = streamConfig.getPhysicalName(streamId, streamId);
+    String physicalName = streamConfig.getPhysicalName(streamId, String.format("%s-%s", streamPrefix, streamId));
     Map<String, String> properties = streamConfig.getStreamProperties(streamId);
 
     return new StreamSpec(streamId, physicalName, system, properties);

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
index ce129aa..3288c5c 100644
--- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -46,7 +46,7 @@ public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {
     // TODO: actually instantiate the tasks and run the job, i.e.
     try {
       // 1. build stream graph
-      StreamGraph streamGraph = new StreamGraphImpl();
+      StreamGraph streamGraph = new StreamGraphImpl(this);
       app.init(streamGraph, config);
 
       // 2. create the physical execution plan

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
index 71d60ef..b88e356 100644
--- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -36,7 +36,7 @@ public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment
 
   // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
   StreamGraph createGraph(StreamGraphBuilder app, Config config) {
-    StreamGraphImpl graph = new StreamGraphImpl();
+    StreamGraphImpl graph = new StreamGraphImpl(this);
     app.init(graph, config);
     return graph;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index b007e3c..6032f4d 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -25,6 +25,7 @@ import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.ExecutionEnvironment;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 
@@ -77,8 +78,11 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
 
   @Override
   public final void init(Config config, TaskContext context) throws Exception {
+    // for now, we need to create the execution env again
+    // in the future if we decide to serialize the dag, this can be clean up
+    ExecutionEnvironment executionEnvironment = ExecutionEnvironment.fromConfig(config);
     // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
-    StreamGraphImpl streams = new StreamGraphImpl();
+    StreamGraphImpl streams = new StreamGraphImpl(executionEnvironment);
     this.graphBuilder.init(streams, config);
     // get the context manager of the {@link StreamGraph} and initialize the task-specific context
     this.contextManager = streams.getContextManager();

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
index 2eba59b..e4fb32f 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
@@ -32,8 +32,8 @@ public class ConfigInheritence {
 
   public static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
     Config scopedConfig = fullConfig.subset(configPrefix);
-    log.info("Prefix '{}' has extracted config {}", configPrefix, scopedConfig);
-    log.info("Prefix '{}' has generated config {}", configPrefix, generatedConfig);
+    log.debug("Prefix '{}' has extracted config {}", configPrefix, scopedConfig);
+    log.debug("Prefix '{}' has generated config {}", configPrefix, generatedConfig);
 
     Config[] configPrecedence;
     if (INHERIT_ROOT_CONFIGS) {
@@ -53,7 +53,7 @@ public class ConfigInheritence {
       }
     }
     scopedConfig = new MapConfig(mergedConfig);
-    log.info("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
+    log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
 
     return scopedConfig;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 1c58293..fbfe90f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -45,6 +45,7 @@ object JobConfig {
   val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
   val JOB_METADATA_DEFAULT_SYSTEM = "job.metadata.system"
   val JOB_DEFAULT_SYSTEM = "job.default.system"
+  val JOB_DEFAULT_PARTITIONS = "job.default.partitions"
   val JOB_CONTAINER_COUNT = "job.container.count"
   val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"

http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java
new file mode 100644
index 0000000..68a2142
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.system.AbstractExecutionEnvironment;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class TestExecutionPlanner {
+
+  private Config config;
+
+  private ExecutionEnvironment env;
+
+  private static final String DEFAULT_SYSTEM = "test-system";
+  private static final int DEFAULT_PARTITIONS = 10;
+
+  private StreamSpec input1;
+  private StreamSpec input2;
+  private StreamSpec input3;
+  private StreamSpec output1;
+  private StreamSpec output2;
+
+  private Map<String, SystemAdmin> systemAdmins;
+
+  private JoinFunction createJoin() {
+    return new JoinFunction() {
+      @Override
+      public Object apply(Object message, Object otherMessage) {
+        return null;
+      }
+
+      @Override
+      public Object getFirstKey(Object message) {
+        return null;
+      }
+
+      @Override
+      public Object getSecondKey(Object message) {
+        return null;
+      }
+    };
+  }
+
+  private SinkFunction createSink() {
+    return new SinkFunction() {
+      @Override
+      public void apply(Object message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
+      }
+    };
+  }
+
+  private SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) {
+
+    return new SystemAdmin() {
+      @Override
+      public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+        return null;
+      }
+
+      @Override
+      public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+        Map<String, SystemStreamMetadata> map = new HashMap<>();
+        for (String stream : streamNames) {
+          Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> m = new HashMap<>();
+          for (int i = 0; i < streamToPartitions.get(stream); i++) {
+            m.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata("", "", ""));
+          }
+          map.put(stream, new SystemStreamMetadata(stream, m));
+        }
+        return map;
+      }
+
+      @Override
+      public void createChangelogStream(String streamName, int numOfPartitions) {
+
+      }
+
+      @Override
+      public void validateChangelogStream(String streamName, int numOfPartitions) {
+
+      }
+
+      @Override
+      public void createCoordinatorStream(String streamName) {
+
+      }
+
+      @Override
+      public Integer offsetComparator(String offset1, String offset2) {
+        return null;
+      }
+    };
+  }
+
+  private StreamGraph createSimpleGraph() {
+    /**
+     * a simple graph of partitionBy and map
+     *
+     * input1 -> partitionBy -> map -> output1
+     *
+     */
+    StreamGraph streamGraph = new StreamGraphImpl(env);
+    streamGraph.createInStream(input1, null, null).partitionBy(m -> "yes!!!").map(m -> m).sendTo(streamGraph.createOutStream(output1, null, null));
+    return streamGraph;
+  }
+
+  private StreamGraph createStreamGraphWithJoin() {
+
+    /** the graph looks like the following
+     *
+     *                        input1 -> map -> join -> output1
+     *                                           |
+     *          input2 -> partitionBy -> filter -|
+     *                                           |
+     * input3 -> filter -> partitionBy -> map -> join -> output2
+     *
+     */
+
+    StreamGraph streamGraph = new StreamGraphImpl(env);
+    MessageStream m1 = streamGraph.createInStream(input1, null, null).map(m -> m);
+    MessageStream m2 = streamGraph.createInStream(input2, null, null).partitionBy(m -> "haha").filter(m -> true);
+    MessageStream m3 = streamGraph.createInStream(input3, null, null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+
+    m1.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output1, null, null));
+    m3.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output2, null, null));
+
+    return streamGraph;
+  }
+
+  @Before
+  public void setup() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "test-app");
+    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), DEFAULT_SYSTEM);
+
+    config = new MapConfig(configMap);
+
+    env = new AbstractExecutionEnvironment(config) {
+      @Override
+      public void run(StreamGraphBuilder graphBuilder, Config config) {
+      }
+    };
+
+    input1 = new StreamSpec("input1", "input1", "system1");
+    input2 = new StreamSpec("input2", "input2", "system2");
+    input3 = new StreamSpec("input3", "input3", "system2");
+
+    output1 = new StreamSpec("output1", "output1", "system1");
+    output2 = new StreamSpec("output2", "output2", "system2");
+
+    // set up external partition count
+    Map<String, Integer> system1Map = new HashMap<>();
+    system1Map.put("input1", 64);
+    system1Map.put("output1", 8);
+    Map<String, Integer> system2Map = new HashMap<>();
+    system2Map.put("input2", 16);
+    system2Map.put("input3", 32);
+    system2Map.put("output2", 16);
+
+    SystemAdmin systemAdmin1 = createSystemAdmin(system1Map);
+    SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
+    systemAdmins = new HashMap<>();
+    systemAdmins.put("system1", systemAdmin1);
+    systemAdmins.put("system2", systemAdmin2);
+  }
+
+  @Test
+  public void testCreateProcessorGraph() {
+    ExecutionPlanner planner = new ExecutionPlanner(config);
+    StreamGraph streamGraph = createStreamGraphWithJoin();
+
+    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+    assertTrue(processorGraph.getSources().size() == 3);
+    assertTrue(processorGraph.getSinks().size() == 2);
+    assertTrue(processorGraph.getInternalStreams().size() == 2); // two streams generated by partitionBy
+  }
+
+  @Test
+  public void testFetchExistingStreamPartitions() {
+    ExecutionPlanner planner = new ExecutionPlanner(config);
+    StreamGraph streamGraph = createStreamGraphWithJoin();
+    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+
+    ExecutionPlanner.fetchExistingStreamPartitions(processorGraph, systemAdmins);
+    assertTrue(processorGraph.getEdge(input1).getPartitions() == 64);
+    assertTrue(processorGraph.getEdge(input2).getPartitions() == 16);
+    assertTrue(processorGraph.getEdge(input3).getPartitions() == 32);
+    assertTrue(processorGraph.getEdge(output1).getPartitions() == 8);
+    assertTrue(processorGraph.getEdge(output2).getPartitions() == 16);
+
+    processorGraph.getInternalStreams().forEach(edge -> {
+        assertTrue(edge.getPartitions() == -1);
+      });
+  }
+
+  @Test
+  public void testCalculateJoinInputPartitions() {
+    ExecutionPlanner planner = new ExecutionPlanner(config);
+    StreamGraph streamGraph = createStreamGraphWithJoin();
+    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+
+    ExecutionPlanner.fetchExistingStreamPartitions(processorGraph, systemAdmins);
+    ExecutionPlanner.calculateJoinInputPartitions(streamGraph, processorGraph);
+
+    // the partitions should be the same as input1
+    processorGraph.getInternalStreams().forEach(edge -> {
+        assertTrue(edge.getPartitions() == 64);
+      });
+  }
+
+  @Test
+  public void testDefaultPartitions() {
+    Map<String, String> map = new HashMap<>(config);
+    map.put(JobConfig.JOB_DEFAULT_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
+    Config cfg = new MapConfig(map);
+
+    ExecutionPlanner planner = new ExecutionPlanner(cfg);
+    StreamGraph streamGraph = createSimpleGraph();
+    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+    planner.calculatePartitions(streamGraph, processorGraph, systemAdmins);
+
+    // the partitions should be the same as input1
+    processorGraph.getInternalStreams().forEach(edge -> {
+        assertTrue(edge.getPartitions() == DEFAULT_PARTITIONS);
+      });
+  }
+
+  @Test
+  public void testCalculateIntStreamPartitions() {
+    ExecutionPlanner planner = new ExecutionPlanner(config);
+    StreamGraph streamGraph = createSimpleGraph();
+    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+    planner.calculatePartitions(streamGraph, processorGraph, systemAdmins);
+
+    // the partitions should be the same as input1
+    processorGraph.getInternalStreams().forEach(edge -> {
+        assertTrue(edge.getPartitions() == 64); // max of input1 and output1
+      });
+  }
+}