You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/09/26 18:00:59 UTC

[4/4] samza git commit: SAMZA-1814: consolidate JobNode and JobGraph configuration generation for high and low-level API applications

SAMZA-1814: consolidate JobNode and JobGraph configuration generation for high and low-level API applications

High-level changes:
- Move configuration generation to JobNodeConfigurationGenerator
- Move the intermediate partition calculation to IntermediationStreamPartitionPlanner
- Consolidate the code in JobPlanner and ExecutionPlanner for high and low-level API plan/configuration generation

Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Author: Yi Pan (Data Infrastructure) <yi...@yipan-ld2.linkedin.biz>
Author: Prateek Maheshwari <pm...@linkedin.com>
Author: Prateek Maheshwari <pr...@utexas.edu>
Author: prateekm <pr...@utexas.edu>

Reviewers: Prateek Maheshwari <pm...@apache.org>, Cameron Lee <ca...@linkedin.com>

Closes #642 from nickpan47/SAMZA-1814 and squashes the following commits:

214373966 [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814. With minor fixes to allow merge correctly.
f8c8108ac [Yi Pan (Data Infrastructure)] SAMZA-1814: Fix merging errors.
c8681a028 [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814
b66b9fa9d [Yi Pan (Data Infrastructure)] SAMZA-1814: moving serde generation to a single top-level configuration generation, not embedded in table. Address review comments
0db5068dd [Yi Pan (Data Infrastructure)] SAMZA-1814: fix merge issue and consolidated some test classes
2c856c5f5 [Yi Pan (Data Infrastructure)] SAMZA-1814: consolidate configuration generation for high and low-level APIs
ffc6f1a70 [Yi Pan (Data Infrastructure)] SAMZA-1814: consolidate configuration generation in ExecutionPlanner between high and low-level API applications
c7fde4a03 [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814
44844635b [Yi Pan (Data Infrastructure)] SAMZA-1814: merge with master
8797cdd4f [Yi Pan (Data Infrastructure)] SAMZA-1814: merge with master
dae98cebe [Yi Pan (Data Infrastructure)] SAMZA-1814: consolidate the configure generation between high and low-level API applications
3a91b9a62 [Yi Pan (Data Infrastructure)] SAMZA-1814: moving some logic to ApplicationDescriptorImpl to simplify the JobGraph/JobNode code
97c00a2e0 [Yi Pan (Data Infrastructure)] SAMZA-1814: WIP unit tests fixed for configure generation.
05637e6e6 [Yi Pan (Data Infrastructure)] SAMZA-1814: WIP consolidate all JobGraph and JobNode Json and Config generation code to support both high- and low-level applications
9d564642a [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814
16bef1b1b [Yi Pan (Data Infrastructure)] SAMZA-1814: WIP fixing the task application configuration generation in the planner
66af5b706 [Yi Pan (Data Infrastructure)] SAMZA-1789: addressing Cameron's review comments.
ec4bb1dca [Yi Pan (Data Infrastructure)] SAMZA-1789: merge with fix for SAMZA-1836
9c89c63dc [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
91fcd73ae [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
34ffda8ae [Yi Pan (Data Infrastructure)] SAMZA-1789: disabling tests due to SAMZA-1836
02076c850 [Yi Pan (Data Infrastructure)] SAMZA-1789: fixed the modifier for the mandatory constructor of ApplicationRunner; Disabled three tests due to wrong configure for test systems
222abf21f [Yi Pan (Data Infrastructure)] SAMZA-1789: added a constructor to StreamProcessor to take a StreamProcessorListenerFactory
7a73992a5 [Yi Pan (Data Infrastructure)] SAMZA-1789: fixing checkstyle and javadoc errors
9997b98bb [Yi Pan (Data Infrastructure)] SAMZA-1789: renamed all ApplicationDescriptor classes with full-spelling of Application
f4b3d43a4 [Yi Pan (Data Infrastructure)] SAMZA-1789: Fxing TaskApplication examples and some checkstyle errors
f2969f8df [Yi Pan (Data Infrastructure)] SAMZA-1789: fixed ApplicationDescriptor to use InputDescriptor and OutputDescriptor; addressed Prateek's comments.
f04404cc2 [Yi Pan (Data Infrastructure)] SAMZA-1789: move createStreams out of the loop in prepareJobs
33753f72d [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
12c09af06 [Yi Pan (Data Infrastructure)] SAMZA-1789: Fix a merging error (with SAMZA-1813)
a072118d0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
e7af6932d [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
8d4d3ffda [Yi Pan (Data Infrastructure)] Merge with master
055bd91e4 [Yi Pan (Data Infrastructure)] SAMZA-1789: fix unit test with ThreadJobFactory
247dcff4c [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
1621c4d00 [Yi Pan (Data Infrastructure)] SAMZA-1789: a few more fixes to address Cameron's reviews
6e446fe6d [Yi Pan (Data Infrastructure)] SAMZA-1789: address Cameron's review comments.
4382d45db [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
3b2f04d54 [Yi Pan (Data Infrastructure)] SAMZA-1789: moved all impl classes from samza-api to samza-core.
db96da830 [Yi Pan (Data Infrastructure)] SAMZA-1789: WIP - revision to address review feedbacks.
014337170 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
a82708bb0 [Yi Pan (Data Infrastructure)] SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for high- and low-level APIs in YARN and standalone environment
c4bb0dce6 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
f20cdcda6 [Yi Pan (Data Infrastructure)] WIP: adding unit tests. Pending update on StreamProcessorLifecycleListener, LocalContainerRunner, and SamzaContainerListener
973eb5261 [Yi Pan (Data Infrastructure)] WIP: compiles, still working on LocalContainerRunner refactor
fb1bc49e0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-spec-with-app-runtime-Jul-16-18
30a4e5f0a [Yi Pan (Data Infrastructure)] WIP: application runner refactor - proto-type for SEP-13
95577b74c [Yi Pan (Data Infrastructure)] WIP: trying to figure out the two interface classes for spec: a) spec builder in init(); b) spec reader in all other lifecycle methods
42782d815 [Yi Pan (Data Infrastructure)] Merge branch 'prateek-remove-app-runner-stream-spec' into app-spec-with-app-runtime-Jul-16-18
d43e92319 [Yi Pan (Data Infrastructure)] WIP: proto-type with ApplicationRunnable and no ApplicationRunner exposed to user
f1cb8f0eb [Yi Pan (Data Infrastructure)] Merge branch 'master' into single-app-api-May-21-18
7e71dc7e0 [Yi Pan (Data Infrastructure)] Merge with master
856193013 [Prateek Maheshwari] Merge branch 'master' into stream-spec-cleanup
7d7aa5088 [Prateek Maheshwari] Updated with Cameron and Daniel's feedback.
8e6fc2dac [prateekm] Remove all usages of StreamSpec and ApplicationRunner from the operator spec and impl layers.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cfbb9c6e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cfbb9c6e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cfbb9c6e

Branch: refs/heads/master
Commit: cfbb9c6ebabfe1e5e13af50a0487bde3f5c1e925
Parents: b842626
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Wed Sep 26 11:00:51 2018 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Sep 26 11:00:51 2018 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../application/ApplicationDescriptorImpl.java  | 106 +++-
 .../StreamApplicationDescriptorImpl.java        |  54 +-
 .../TaskApplicationDescriptorImpl.java          |  14 +
 .../samza/execution/ExecutionPlanner.java       | 288 ++---------
 .../execution/IntermediateStreamManager.java    | 297 +++++++++++
 .../org/apache/samza/execution/JobGraph.java    |  92 ++--
 .../samza/execution/JobGraphJsonGenerator.java  | 110 ++--
 .../org/apache/samza/execution/JobNode.java     | 365 +++----------
 .../JobNodeConfigurationGenerator.java          | 361 +++++++++++++
 .../org/apache/samza/execution/JobPlanner.java  |  67 +--
 .../apache/samza/execution/LocalJobPlanner.java |  21 +-
 .../execution/OperatorSpecGraphAnalyzer.java    |  12 +-
 .../samza/execution/RemoteJobPlanner.java       |  20 +-
 .../samza/operators/BaseTableDescriptor.java    |   9 +
 .../samza/operators/OperatorSpecGraph.java      |   7 -
 .../samza/runtime/LocalContainerRunner.java     |   2 +-
 .../samza/table/TableConfigGenerator.java       |  48 --
 .../samza/zk/ZkJobCoordinatorFactory.java       |   5 +-
 .../org/apache/samza/config/JobConfig.scala     |   4 +-
 .../apache/samza/container/SamzaContainer.scala |   2 +-
 .../MetricsSnapshotReporterFactory.scala        |   1 -
 .../samza/util/CoordinatorStreamUtil.scala      |   2 +-
 .../TestStreamApplicationDescriptorImpl.java    |   3 +-
 .../TestTaskApplicationDescriptorImpl.java      |   8 +-
 .../execution/ExecutionPlannerTestBase.java     | 157 ++++++
 .../samza/execution/TestExecutionPlanner.java   | 181 ++++++-
 .../TestIntermediateStreamManager.java          |  68 +++
 .../apache/samza/execution/TestJobGraph.java    |  34 +-
 .../execution/TestJobGraphJsonGenerator.java    | 147 +++++-
 .../org/apache/samza/execution/TestJobNode.java | 228 ---------
 .../TestJobNodeConfigurationGenerator.java      | 509 +++++++++++++++++++
 .../samza/execution/TestRemoteJobPlanner.java   |   2 +-
 .../samza/operators/TestOperatorSpecGraph.java  |   1 -
 .../operators/spec/OperatorSpecTestUtils.java   |   1 -
 .../runtime/TestLocalApplicationRunner.java     |   9 +-
 .../runtime/TestRemoteApplicationRunner.java    |   2 +-
 .../samza/system/hdfs/HdfsSystemFactory.scala   |   2 +-
 .../kafka/KafkaCheckpointManagerFactory.scala   |   2 +-
 .../samza/config/KafkaConsumerConfig.java       |  13 +-
 .../scala/org/apache/samza/util/KafkaUtil.scala |   2 +-
 .../kv/BaseLocalStoreBackedTableProvider.java   |  17 +-
 .../apache/samza/test/framework/TestRunner.java |  58 +--
 .../system/InMemorySystemDescriptor.java        |  10 +-
 .../table/TestTableDescriptorsProvider.java     |   6 -
 .../samza/validation/YarnJobValidationTool.java |   2 +-
 .../org/apache/samza/job/yarn/YarnJob.scala     |   4 +-
 47 files changed, 2162 insertions(+), 1192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 48a28f2..a45a875 100644
--- a/build.gradle
+++ b/build.gradle
@@ -194,6 +194,7 @@ project(":samza-core_$scalaVersion") {
     testCompile "org.powermock:powermock-core:$powerMockVersion"
     testCompile "org.powermock:powermock-module-junit4:$powerMockVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
+    testCompile "org.hamcrest:hamcrest-all:$hamcrestVersion"
   }
 
   checkstyle {

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
index 9679136..b58d5a5 100644
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
@@ -19,6 +19,7 @@
 package org.apache.samza.application;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -26,13 +27,20 @@ import java.util.Set;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsReporterFactory;
 import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
 import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
 import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.runtime.ProcessorLifecycleListener;
 import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -46,10 +54,15 @@ import org.apache.samza.task.TaskContext;
  */
 public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
     implements ApplicationDescriptor<S> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationDescriptorImpl.class);
 
-  final Config config;
   private final Class<? extends SamzaApplication> appClass;
   private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>();
+  // serdes used by input/output/intermediate streams, keyed by streamId
+  private final Map<String, KV<Serde, Serde>> streamSerdes = new HashMap<>();
+  // serdes used by tables, keyed by tableId
+  private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>();
+  final Config config;
 
   // Default to no-op functions in ContextManager
   // TODO: this should be replaced by shared context factory defined in SAMZA-1714
@@ -142,6 +155,35 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
   }
 
   /**
+   * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+   *
+   * @param streamId id of the stream
+   * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+   */
+  public KV<Serde, Serde> getStreamSerdes(String streamId) {
+    return streamSerdes.get(streamId);
+  }
+
+  /**
+   * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+   *
+   * @param tableId id of the table
+   * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+   */
+  public KV<Serde, Serde> getTableSerdes(String tableId) {
+    return tableSerdes.get(tableId);
+  }
+
+  /**
+   * Get the map of all {@link InputOperatorSpec}s in this applicaiton
+   *
+   * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level {@link TaskApplication}
+   */
+  public Map<String, InputOperatorSpec> getInputOperators() {
+    return Collections.EMPTY_MAP;
+  }
+
+  /**
    * Get all the {@link InputDescriptor}s to this application
    *
    * @return an immutable map of streamId to {@link InputDescriptor}
@@ -176,4 +218,66 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
    */
   public abstract Set<SystemDescriptor> getSystemDescriptors();
 
+  /**
+   * Get all the unique input streamIds in this application
+   *
+   * @return an immutable set of input streamIds
+   */
+  public abstract Set<String> getInputStreamIds();
+
+  /**
+   * Get all the unique output streamIds in this application
+   *
+   * @return an immutable set of output streamIds
+   */
+  public abstract Set<String> getOutputStreamIds();
+
+  KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) {
+    Serde keySerde, valueSerde;
+
+    KV<Serde, Serde> currentSerdePair = streamSerdes.get(streamId);
+
+    if (serde instanceof KVSerde) {
+      keySerde = ((KVSerde) serde).getKeySerde();
+      valueSerde = ((KVSerde) serde).getValueSerde();
+    } else {
+      keySerde = new NoOpSerde();
+      valueSerde = serde;
+    }
+
+    if (currentSerdePair == null) {
+      if (keySerde instanceof NoOpSerde) {
+        LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
+            ". Keys will not be (de)serialized");
+      }
+      if (valueSerde instanceof NoOpSerde) {
+        LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
+            ". Values will not be (de)serialized");
+      }
+      streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
+    } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
+      throw new IllegalArgumentException(String.format("Serde for stream %s is already defined. Cannot change it to "
+          + "different serdes.", streamId));
+    }
+    return streamSerdes.get(streamId);
+  }
+
+  KV<Serde, Serde> getOrCreateTableSerdes(String tableId, KVSerde kvSerde) {
+    Serde keySerde, valueSerde;
+    keySerde = kvSerde.getKeySerde();
+    valueSerde = kvSerde.getValueSerde();
+
+    if (!tableSerdes.containsKey(tableId)) {
+      tableSerdes.put(tableId, KV.of(keySerde, valueSerde));
+      return tableSerdes.get(tableId);
+    }
+
+    KV<Serde, Serde> currentSerdePair = tableSerdes.get(tableId);
+    if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
+      throw new IllegalArgumentException(String.format("Serde for table %s is already defined. Cannot change it to "
+          + "different serdes.", tableId));
+    }
+    return streamSerdes.get(tableId);
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
index d50b0d0..5129913 100644
--- a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
@@ -51,7 +51,6 @@ import org.apache.samza.operators.spec.OperatorSpecs;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
@@ -78,7 +77,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
   // We use a LHM for deterministic order in initializing and closing operators.
   private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
   private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
-  private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
+  private final Map<String, TableImpl> tables = new LinkedHashMap<>();
   private final Set<String> operatorIds = new HashSet<>();
 
   private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
@@ -125,7 +124,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
         "getInputStream must not be called multiple times with the same streamId: " + streamId);
 
     Serde serde = inputDescriptor.getSerde();
-    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+    KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
     if (outputStreams.containsKey(streamId)) {
       OutputStreamImpl outputStream = outputStreams.get(streamId);
       Serde keySerde = outputStream.getKeySerde();
@@ -156,7 +155,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
         "getOutputStream must not be called multiple times with the same streamId: " + streamId);
 
     Serde serde = outputDescriptor.getSerde();
-    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+    KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
     if (inputOperators.containsKey(streamId)) {
       InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
       Serde keySerde = inputOperatorSpec.getKeySerde();
@@ -186,13 +185,15 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
         String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
     tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
 
-    TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec();
-    if (tables.containsKey(tableSpec)) {
+    BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
+    TableSpec tableSpec = baseTableDescriptor.getTableSpec();
+    if (tables.containsKey(tableSpec.getId())) {
       throw new IllegalStateException(
           String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
     }
-    tables.put(tableSpec, new TableImpl(tableSpec));
-    return tables.get(tableSpec);
+    tables.put(tableSpec.getId(), new TableImpl(tableSpec));
+    getOrCreateTableSerdes(tableSpec.getId(), baseTableDescriptor.getSerde());
+    return tables.get(tableSpec.getId());
   }
 
   /**
@@ -247,6 +248,16 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
     return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
   }
 
+  @Override
+  public Set<String> getInputStreamIds() {
+    return Collections.unmodifiableSet(new HashSet<>(inputOperators.keySet()));
+  }
+
+  @Override
+  public Set<String> getOutputStreamIds() {
+    return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet()));
+  }
+
   /**
    * Get the default {@link SystemDescriptor} in this application
    *
@@ -306,7 +317,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
     return Collections.unmodifiableMap(outputStreams);
   }
 
-  public Map<TableSpec, TableImpl> getTables() {
+  public Map<String, TableImpl> getTables() {
     return Collections.unmodifiableMap(tables);
   }
 
@@ -342,7 +353,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
       kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs
     } else {
       isKeyed = serde instanceof KVSerde;
-      kvSerdes = getKVSerdes(streamId, serde);
+      kvSerdes = getOrCreateStreamSerdes(streamId, serde);
     }
 
     InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor()
@@ -356,29 +367,6 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
     return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
   }
 
-  private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
-    Serde keySerde, valueSerde;
-
-    if (serde instanceof KVSerde) {
-      keySerde = ((KVSerde) serde).getKeySerde();
-      valueSerde = ((KVSerde) serde).getValueSerde();
-    } else {
-      keySerde = new NoOpSerde();
-      valueSerde = serde;
-    }
-
-    if (keySerde instanceof NoOpSerde) {
-      LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
-          ". Keys will not be (de)serialized");
-    }
-    if (valueSerde instanceof NoOpSerde) {
-      LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
-          ". Values will not be (de)serialized");
-    }
-
-    return KV.of(keySerde, valueSerde);
-  }
-
   // check uniqueness of the {@code systemDescriptor} and add if it is unique
   private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
     Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
index 3597d7c..d140a90 100644
--- a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
@@ -25,6 +25,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.BaseTableDescriptor;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
 import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
@@ -65,6 +66,7 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
     // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
     Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
         String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+    getOrCreateStreamSerdes(inputDescriptor.getStreamId(), inputDescriptor.getSerde());
     inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
     addSystemDescriptor(inputDescriptor.getSystemDescriptor());
   }
@@ -73,6 +75,7 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
   public void addOutputStream(OutputDescriptor outputDescriptor) {
     Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
         String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+    getOrCreateStreamSerdes(outputDescriptor.getStreamId(), outputDescriptor.getSerde());
     outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
     addSystemDescriptor(outputDescriptor.getSystemDescriptor());
   }
@@ -81,6 +84,7 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
   public void addTable(TableDescriptor tableDescriptor) {
     Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
         String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
+    getOrCreateTableSerdes(tableDescriptor.getTableId(), ((BaseTableDescriptor) tableDescriptor).getSerde());
     tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
   }
 
@@ -111,6 +115,16 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
     return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
   }
 
+  @Override
+  public Set<String> getInputStreamIds() {
+    return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
+  }
+
+  @Override
+  public Set<String> getOutputStreamIds() {
+    return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
+  }
+
   /**
    * Get the user-defined {@link TaskFactory}
    * @return the {@link TaskFactory} object

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 46aef8d..eea6387 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -22,72 +22,57 @@ package org.apache.samza.execution;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.JoinOperatorSpec;
+import org.apache.samza.operators.BaseTableDescriptor;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.samza.execution.ExecutionPlanner.StreamEdgeSet.StreamEdgeSetCategory;
 import static org.apache.samza.util.StreamUtil.*;
 
 
 /**
- * The ExecutionPlanner creates the physical execution graph for the {@link OperatorSpecGraph}, and
+ * The ExecutionPlanner creates the physical execution graph for the {@link ApplicationDescriptorImpl}, and
  * the intermediate topics needed for the execution.
  */
 // TODO: ExecutionPlanner needs to be able to generate single node JobGraph for low-level TaskApplication as well (SAMZA-1811)
 public class ExecutionPlanner {
   private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
 
-  /* package private */ static final int MAX_INFERRED_PARTITIONS = 256;
-
   private final Config config;
-  private final StreamConfig streamConfig;
   private final StreamManager streamManager;
 
   public ExecutionPlanner(Config config, StreamManager streamManager) {
     this.config = config;
     this.streamManager = streamManager;
-    this.streamConfig = new StreamConfig(config);
   }
 
-  public ExecutionPlan plan(OperatorSpecGraph opSpecGraph) {
+  public ExecutionPlan plan(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
     validateConfig();
 
-    // Create physical job graph based on stream graph
-    JobGraph jobGraph = createJobGraph(opSpecGraph);
-
-    // Fetch the external streams partition info
-    fetchInputAndOutputStreamPartitions(jobGraph);
+    // create physical job graph based on stream graph
+    JobGraph jobGraph = createJobGraph(config, appDesc);
 
-    // Verify agreement in partition count between all joined input/intermediate streams
-    validateJoinInputStreamPartitions(jobGraph);
+    // fetch the external streams partition info
+    setInputAndOutputStreamPartitionCount(jobGraph, streamManager);
 
-    if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
-      // Set partition count of intermediate streams not participating in joins
-      setIntermediateStreamPartitions(jobGraph);
-
-      // Validate partition counts were assigned for all intermediate streams
-      validateIntermediateStreamPartitions(jobGraph);
-    }
+    // figure out the partitions for internal streams
+    new IntermediateStreamManager(config, appDesc).calculatePartitions(jobGraph);
 
     return jobGraph;
   }
@@ -103,21 +88,23 @@ public class ExecutionPlanner {
   }
 
   /**
-   * Creates the physical graph from {@link OperatorSpecGraph}
+   * Create the physical graph from {@link ApplicationDescriptorImpl}
    */
-  /* package private */ JobGraph createJobGraph(OperatorSpecGraph opSpecGraph) {
-    JobGraph jobGraph = new JobGraph(config, opSpecGraph);
-
+  /* package private */
+  JobGraph createJobGraph(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+    JobGraph jobGraph = new JobGraph(config, appDesc);
+    StreamConfig streamConfig = new StreamConfig(config);
     // Source streams contain both input and intermediate streams.
-    Set<StreamSpec> sourceStreams = getStreamSpecs(opSpecGraph.getInputOperators().keySet(), streamConfig);
+    Set<StreamSpec> sourceStreams = getStreamSpecs(appDesc.getInputStreamIds(), streamConfig);
     // Sink streams contain both output and intermediate streams.
-    Set<StreamSpec> sinkStreams = getStreamSpecs(opSpecGraph.getOutputStreams().keySet(), streamConfig);
+    Set<StreamSpec> sinkStreams = getStreamSpecs(appDesc.getOutputStreamIds(), streamConfig);
 
     Set<StreamSpec> intermediateStreams = Sets.intersection(sourceStreams, sinkStreams);
     Set<StreamSpec> inputStreams = Sets.difference(sourceStreams, intermediateStreams);
     Set<StreamSpec> outputStreams = Sets.difference(sinkStreams, intermediateStreams);
 
-    Set<TableSpec> tables = opSpecGraph.getTables().keySet();
+    Set<TableSpec> tables = appDesc.getTableDescriptors().stream()
+        .map(tableDescriptor -> ((BaseTableDescriptor) tableDescriptor).getTableSpec()).collect(Collectors.toSet());
 
     // For this phase, we have a single job node for the whole dag
     String jobName = config.get(JobConfig.JOB_NAME());
@@ -136,15 +123,20 @@ public class ExecutionPlanner {
     // Add tables
     tables.forEach(spec -> jobGraph.addTable(spec, node));
 
-    jobGraph.validate();
+    if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
+      // skip the validation when input streamIds are empty. This is only possible for LegacyTaskApplication
+      jobGraph.validate();
+    }
 
     return jobGraph;
   }
 
   /**
-   * Fetches the partitions of input/output streams and update the corresponding StreamEdges.
+   * Fetch the partitions of source/sink streams and update the StreamEdges.
+   * @param jobGraph {@link JobGraph}
+   * @param streamManager the {@link StreamManager} to interface with the streams.
    */
-  /* package private */ void fetchInputAndOutputStreamPartitions(JobGraph jobGraph) {
+  /* package private */ static void setInputAndOutputStreamPartitionCount(JobGraph jobGraph, StreamManager streamManager) {
     Set<StreamEdge> existingStreams = new HashSet<>();
     existingStreams.addAll(jobGraph.getInputStreams());
     existingStreams.addAll(jobGraph.getOutputStreams());
@@ -182,224 +174,4 @@ public class ExecutionPlanner {
     }
   }
 
-  /**
-   * Validates agreement in partition count between input/intermediate streams participating in join operations.
-   */
-  private void validateJoinInputStreamPartitions(JobGraph jobGraph) {
-    // Group input operator specs (input/intermediate streams) by the joins they participate in.
-    Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs =
-        OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(jobGraph.getSpecGraph());
-
-    // Convert every group of input operator specs into a group of corresponding stream edges.
-    List<StreamEdgeSet> streamEdgeSets = new ArrayList<>();
-    for (JoinOperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) {
-      Collection<InputOperatorSpec> joinedInputOpSpecs = joinOpSpecToInputOpSpecs.get(joinOpSpec);
-      StreamEdgeSet streamEdgeSet = getStreamEdgeSet(joinOpSpec.getOpId(), joinedInputOpSpecs, jobGraph);
-      streamEdgeSets.add(streamEdgeSet);
-    }
-
-    /*
-     * Sort the stream edge groups by their category so they appear in this order:
-     *   1. groups composed exclusively of stream edges with set partition counts
-     *   2. groups composed of a mix of stream edges  with set/unset partition counts
-     *   3. groups composed exclusively of stream edges with unset partition counts
-     *
-     *   This guarantees that we process the most constrained stream edge groups first,
-     *   which is crucial for intermediate stream edges that are members of multiple
-     *   stream edge groups. For instance, if we have the following groups of stream
-     *   edges (partition counts in parentheses, question marks for intermediate streams):
-     *
-     *      a. e1 (16), e2 (16)
-     *      b. e2 (16), e3 (?)
-     *      c. e3 (?), e4 (?)
-     *
-     *   processing them in the above order (most constrained first) is guaranteed to
-     *   yield correct assignment of partition counts of e3 and e4 in a single scan.
-     */
-    Collections.sort(streamEdgeSets, Comparator.comparingInt(e -> e.getCategory().getSortOrder()));
-
-    // Verify agreement between joined input/intermediate streams.
-    // This may involve setting partition counts of intermediate stream edges.
-    streamEdgeSets.forEach(ExecutionPlanner::validateAndAssignStreamEdgeSetPartitions);
-  }
-
-  /**
-   * Creates a {@link StreamEdgeSet} whose Id is {@code setId}, and {@link StreamEdge}s
-   * correspond to the provided {@code inputOpSpecs}.
-   */
-  private StreamEdgeSet getStreamEdgeSet(String setId, Iterable<InputOperatorSpec> inputOpSpecs,
-      JobGraph jobGraph) {
-
-    int countStreamEdgeWithSetPartitions = 0;
-    Set<StreamEdge> streamEdges = new HashSet<>();
-
-    for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
-      StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(inputOpSpec.getStreamId(), streamConfig));
-      if (streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) {
-        ++countStreamEdgeWithSetPartitions;
-      }
-      streamEdges.add(streamEdge);
-    }
-
-    // Determine category of stream group based on stream partition counts.
-    StreamEdgeSetCategory category;
-    if (countStreamEdgeWithSetPartitions == 0) {
-      category = StreamEdgeSetCategory.NO_PARTITION_COUNT_SET;
-    } else if (countStreamEdgeWithSetPartitions == streamEdges.size()) {
-      category = StreamEdgeSetCategory.ALL_PARTITION_COUNT_SET;
-    } else {
-      category = StreamEdgeSetCategory.SOME_PARTITION_COUNT_SET;
-    }
-
-    return new StreamEdgeSet(setId, streamEdges, category);
-  }
-
-  /**
-   * Sets partition count of intermediate streams which have not been assigned partition counts.
-   */
-  private void setIntermediateStreamPartitions(JobGraph jobGraph) {
-    final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS();
-    int partitions = config.getInt(defaultPartitionsConfigProperty, StreamEdge.PARTITIONS_UNKNOWN);
-    if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
-      // use the following simple algo to figure out the partitions
-      // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
-      // partition will be further bounded by MAX_INFERRED_PARTITIONS.
-      // This is important when running in hadoop where an HDFS input can have lots of files (partitions).
-      int maxInPartitions = maxPartitions(jobGraph.getInputStreams());
-      int maxOutPartitions = maxPartitions(jobGraph.getOutputStreams());
-      partitions = Math.max(maxInPartitions, maxOutPartitions);
-
-      if (partitions > MAX_INFERRED_PARTITIONS) {
-        partitions = MAX_INFERRED_PARTITIONS;
-        log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
-            partitions, MAX_INFERRED_PARTITIONS));
-      }
-    } else {
-      // Reject any zero or other negative values explicitly specified in config.
-      if (partitions <= 0) {
-        throw new SamzaException(String.format("Invalid value %d specified for config property %s", partitions,
-            defaultPartitionsConfigProperty));
-      }
-
-      log.info("Using partition count value {} specified for config property {}", partitions,
-          defaultPartitionsConfigProperty);
-    }
-
-    for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
-      if (edge.getPartitionCount() <= 0) {
-        log.info("Set the partition count for intermediate stream {} to {}.", edge.getName(), partitions);
-        edge.setPartitionCount(partitions);
-      }
-    }
-  }
-
-  /**
-   * Ensures all intermediate streams have been assigned partition counts.
-   */
-  private static void validateIntermediateStreamPartitions(JobGraph jobGraph) {
-    for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
-      if (edge.getPartitionCount() <= 0) {
-        throw new SamzaException(String.format("Failed to assign valid partition count to Stream %s", edge.getName()));
-      }
-    }
-  }
-
-  /**
-   * Ensures that all streams in the supplied {@link StreamEdgeSet} agree in partition count.
-   * This may include setting partition counts of intermediate streams in this set that do not
-   * have their partition counts set.
-   */
-  private static void validateAndAssignStreamEdgeSetPartitions(StreamEdgeSet streamEdgeSet) {
-    Set<StreamEdge> streamEdges = streamEdgeSet.getStreamEdges();
-    StreamEdge firstStreamEdgeWithSetPartitions =
-        streamEdges.stream()
-            .filter(streamEdge -> streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN)
-            .findFirst()
-            .orElse(null);
-
-    // This group consists exclusively of intermediate streams with unknown partition counts.
-    // We cannot do any validation/computation of partition counts of such streams right here,
-    // but they are tackled later in the ExecutionPlanner.
-    if (firstStreamEdgeWithSetPartitions == null) {
-      return;
-    }
-
-    // Make sure all other stream edges in this group have the same partition count.
-    int partitions = firstStreamEdgeWithSetPartitions.getPartitionCount();
-    for (StreamEdge streamEdge : streamEdges) {
-      int streamPartitions = streamEdge.getPartitionCount();
-      if (streamPartitions == StreamEdge.PARTITIONS_UNKNOWN) {
-        streamEdge.setPartitionCount(partitions);
-        log.info("Inferred the partition count {} for the join operator {} from {}."
-            , new Object[] {partitions, streamEdgeSet.getSetId(), firstStreamEdgeWithSetPartitions.getName()});
-      } else if (streamPartitions != partitions) {
-        throw  new SamzaException(String.format(
-            "Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d",
-            streamEdge.getName(), streamEdgeSet.getSetId(), partitions, streamPartitions));
-      }
-    }
-  }
-
-  /* package private */ static int maxPartitions(Collection<StreamEdge> edges) {
-    return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN);
-  }
-
-  /**
-   * Represents a set of {@link StreamEdge}s.
-   */
-  /* package private */ static class StreamEdgeSet {
-
-    /**
-     * Indicates whether all stream edges in this group have their partition counts assigned.
-     */
-    public enum StreamEdgeSetCategory {
-      /**
-       * All stream edges in this group have their partition counts assigned.
-       */
-      ALL_PARTITION_COUNT_SET(0),
-
-      /**
-       * Only some stream edges in this group have their partition counts assigned.
-       */
-      SOME_PARTITION_COUNT_SET(1),
-
-      /**
-       * No stream edge in this group is assigned a partition count.
-       */
-      NO_PARTITION_COUNT_SET(2);
-
-
-      private final int sortOrder;
-
-      StreamEdgeSetCategory(int sortOrder) {
-        this.sortOrder = sortOrder;
-      }
-
-      public int getSortOrder() {
-        return sortOrder;
-      }
-    }
-
-    private final String setId;
-    private final Set<StreamEdge> streamEdges;
-    private final StreamEdgeSetCategory category;
-
-    public StreamEdgeSet(String setId, Set<StreamEdge> streamEdges, StreamEdgeSetCategory category) {
-      this.setId = setId;
-      this.streamEdges = streamEdges;
-      this.category = category;
-    }
-
-    public Set<StreamEdge> getStreamEdges() {
-      return streamEdges;
-    }
-
-    public String getSetId() {
-      return setId;
-    }
-
-    public StreamEdgeSetCategory getCategory() {
-      return category;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
new file mode 100644
index 0000000..66cbe6a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link IntermediateStreamManager} calculates intermediate stream partitions based on the high-level application graph.
+ */
+class IntermediateStreamManager {
+
+  private static final Logger log = LoggerFactory.getLogger(IntermediateStreamManager.class);
+
+  private final Config config;
+  private final Map<String, InputOperatorSpec> inputOperators;
+
+  @VisibleForTesting
+  static final int MAX_INFERRED_PARTITIONS = 256;
+
+  IntermediateStreamManager(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+    this.config = config;
+    this.inputOperators = appDesc.getInputOperators();
+  }
+
+  /**
+   * Figure out the number of partitions of all streams
+   */
+  /* package private */ void calculatePartitions(JobGraph jobGraph) {
+
+    // Verify agreement in partition count between all joined input/intermediate streams
+    validateJoinInputStreamPartitions(jobGraph);
+
+    if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
+      // Set partition count of intermediate streams not participating in joins
+      setIntermediateStreamPartitions(jobGraph);
+
+      // Validate partition counts were assigned for all intermediate streams
+      validateIntermediateStreamPartitions(jobGraph);
+    }
+  }
+
+  /**
+   * Validates agreement in partition count between input/intermediate streams participating in join operations.
+   */
+  private void validateJoinInputStreamPartitions(JobGraph jobGraph) {
+    // Group input operator specs (input/intermediate streams) by the joins they participate in.
+    Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs =
+        OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(inputOperators.values());
+
+    // Convert every group of input operator specs into a group of corresponding stream edges.
+    List<StreamEdgeSet> streamEdgeSets = new ArrayList<>();
+    for (JoinOperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) {
+      Collection<InputOperatorSpec> joinedInputOpSpecs = joinOpSpecToInputOpSpecs.get(joinOpSpec);
+      StreamEdgeSet streamEdgeSet = getStreamEdgeSet(joinOpSpec.getOpId(), joinedInputOpSpecs, jobGraph);
+      streamEdgeSets.add(streamEdgeSet);
+    }
+
+    /*
+     * Sort the stream edge groups by their category so they appear in this order:
+     *   1. groups composed exclusively of stream edges with set partition counts
+     *   2. groups composed of a mix of stream edges  with set/unset partition counts
+     *   3. groups composed exclusively of stream edges with unset partition counts
+     *
+     *   This guarantees that we process the most constrained stream edge groups first,
+     *   which is crucial for intermediate stream edges that are members of multiple
+     *   stream edge groups. For instance, if we have the following groups of stream
+     *   edges (partition counts in parentheses, question marks for intermediate streams):
+     *
+     *      a. e1 (16), e2 (16)
+     *      b. e2 (16), e3 (?)
+     *      c. e3 (?), e4 (?)
+     *
+     *   processing them in the above order (most constrained first) is guaranteed to
+     *   yield correct assignment of partition counts of e3 and e4 in a single scan.
+     */
+    Collections.sort(streamEdgeSets, Comparator.comparingInt(e -> e.getCategory().getSortOrder()));
+
+    // Verify agreement between joined input/intermediate streams.
+    // This may involve setting partition counts of intermediate stream edges.
+    streamEdgeSets.forEach(IntermediateStreamManager::validateAndAssignStreamEdgeSetPartitions);
+  }
+
+  /**
+   * Creates a {@link StreamEdgeSet} whose Id is {@code setId}, and {@link StreamEdge}s
+   * correspond to the provided {@code inputOpSpecs}.
+   */
+  private StreamEdgeSet getStreamEdgeSet(String setId, Iterable<InputOperatorSpec> inputOpSpecs,
+      JobGraph jobGraph) {
+
+    int countStreamEdgeWithSetPartitions = 0;
+    Set<StreamEdge> streamEdges = new HashSet<>();
+
+    for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
+      StreamEdge streamEdge = jobGraph.getStreamEdge(inputOpSpec.getStreamId());
+      if (streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) {
+        ++countStreamEdgeWithSetPartitions;
+      }
+      streamEdges.add(streamEdge);
+    }
+
+    // Determine category of stream group based on stream partition counts.
+    StreamEdgeSet.StreamEdgeSetCategory category;
+    if (countStreamEdgeWithSetPartitions == 0) {
+      category = StreamEdgeSet.StreamEdgeSetCategory.NO_PARTITION_COUNT_SET;
+    } else if (countStreamEdgeWithSetPartitions == streamEdges.size()) {
+      category = StreamEdgeSet.StreamEdgeSetCategory.ALL_PARTITION_COUNT_SET;
+    } else {
+      category = StreamEdgeSet.StreamEdgeSetCategory.SOME_PARTITION_COUNT_SET;
+    }
+
+    return new StreamEdgeSet(setId, streamEdges, category);
+  }
+
+  /**
+   * Sets partition count of intermediate streams which have not been assigned partition counts.
+   */
+  private void setIntermediateStreamPartitions(JobGraph jobGraph) {
+    final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS();
+    int partitions = config.getInt(defaultPartitionsConfigProperty, StreamEdge.PARTITIONS_UNKNOWN);
+    if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
+      // use the following simple algo to figure out the partitions
+      // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
+      // partition will be further bounded by MAX_INFERRED_PARTITIONS.
+      // This is important when running in hadoop where an HDFS input can have lots of files (partitions).
+      int maxInPartitions = maxPartitions(jobGraph.getInputStreams());
+      int maxOutPartitions = maxPartitions(jobGraph.getOutputStreams());
+      partitions = Math.max(maxInPartitions, maxOutPartitions);
+
+      if (partitions > MAX_INFERRED_PARTITIONS) {
+        partitions = MAX_INFERRED_PARTITIONS;
+        log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
+            partitions, MAX_INFERRED_PARTITIONS));
+      }
+    } else {
+      // Reject any zero or other negative values explicitly specified in config.
+      if (partitions <= 0) {
+        throw new SamzaException(String.format("Invalid value %d specified for config property %s", partitions,
+            defaultPartitionsConfigProperty));
+      }
+
+      log.info("Using partition count value {} specified for config property {}", partitions,
+          defaultPartitionsConfigProperty);
+    }
+
+    for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
+      if (edge.getPartitionCount() <= 0) {
+        log.info("Set the partition count for intermediate stream {} to {}.", edge.getName(), partitions);
+        edge.setPartitionCount(partitions);
+      }
+    }
+  }
+
+  /**
+   * Ensures all intermediate streams have been assigned partition counts.
+   */
+  private static void validateIntermediateStreamPartitions(JobGraph jobGraph) {
+    for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
+      if (edge.getPartitionCount() <= 0) {
+        throw new SamzaException(String.format("Failed to assign valid partition count to Stream %s", edge.getName()));
+      }
+    }
+  }
+
+  /**
+   * Ensures that all streams in the supplied {@link StreamEdgeSet} agree in partition count.
+   * This may include setting partition counts of intermediate streams in this set that do not
+   * have their partition counts set.
+   */
+  private static void validateAndAssignStreamEdgeSetPartitions(StreamEdgeSet streamEdgeSet) {
+    Set<StreamEdge> streamEdges = streamEdgeSet.getStreamEdges();
+    StreamEdge firstStreamEdgeWithSetPartitions =
+        streamEdges.stream()
+            .filter(streamEdge -> streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN)
+            .findFirst()
+            .orElse(null);
+
+    // This group consists exclusively of intermediate streams with unknown partition counts.
+    // We cannot do any validation/computation of partition counts of such streams right here,
+    // but they are tackled later in the ExecutionPlanner.
+    if (firstStreamEdgeWithSetPartitions == null) {
+      return;
+    }
+
+    // Make sure all other stream edges in this group have the same partition count.
+    int partitions = firstStreamEdgeWithSetPartitions.getPartitionCount();
+    for (StreamEdge streamEdge : streamEdges) {
+      int streamPartitions = streamEdge.getPartitionCount();
+      if (streamPartitions == StreamEdge.PARTITIONS_UNKNOWN) {
+        streamEdge.setPartitionCount(partitions);
+        log.info("Inferred the partition count {} for the join operator {} from {}.",
+            new Object[] {partitions, streamEdgeSet.getSetId(), firstStreamEdgeWithSetPartitions.getName()});
+      } else if (streamPartitions != partitions) {
+        throw  new SamzaException(String.format(
+            "Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d",
+            streamEdge.getName(), streamEdgeSet.getSetId(), partitions, streamPartitions));
+      }
+    }
+  }
+
+  /* package private */ static int maxPartitions(Collection<StreamEdge> edges) {
+    return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN);
+  }
+
+  /**
+   * Represents a set of {@link StreamEdge}s.
+   */
+  /* package private */ static class StreamEdgeSet {
+
+    /**
+     * Indicates whether all stream edges in this group have their partition counts assigned.
+     */
+    public enum StreamEdgeSetCategory {
+      /**
+       * All stream edges in this group have their partition counts assigned.
+       */
+      ALL_PARTITION_COUNT_SET(0),
+
+      /**
+       * Only some stream edges in this group have their partition counts assigned.
+       */
+      SOME_PARTITION_COUNT_SET(1),
+
+      /**
+       * No stream edge in this group is assigned a partition count.
+       */
+      NO_PARTITION_COUNT_SET(2);
+
+
+      private final int sortOrder;
+
+      StreamEdgeSetCategory(int sortOrder) {
+        this.sortOrder = sortOrder;
+      }
+
+      public int getSortOrder() {
+        return sortOrder;
+      }
+    }
+
+    private final String setId;
+    private final Set<StreamEdge> streamEdges;
+    private final StreamEdgeSetCategory category;
+
+    StreamEdgeSet(String setId, Set<StreamEdge> streamEdges, StreamEdgeSetCategory category) {
+      this.setId = setId;
+      this.streamEdges = streamEdges;
+      this.category = category;
+    }
+
+    Set<StreamEdge> getStreamEdges() {
+      return streamEdges;
+    }
+
+    String getSetId() {
+      return setId;
+    }
+
+    StreamEdgeSetCategory getCategory() {
+      return category;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 5b19095..d975188 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -31,10 +31,11 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
@@ -59,16 +60,21 @@ import org.slf4j.LoggerFactory;
   private final Set<StreamEdge> intermediateStreams = new HashSet<>();
   private final Set<TableSpec> tables = new HashSet<>();
   private final Config config;
-  private final JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();
-  private final OperatorSpecGraph specGraph;
+  private final JobGraphJsonGenerator jsonGenerator;
+  private final JobNodeConfigurationGenerator configGenerator;
+  private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
 
   /**
    * The JobGraph is only constructed by the {@link ExecutionPlanner}.
-   * @param config Config
+   *
+   * @param config configuration for the application
+   * @param appDesc {@link ApplicationDescriptorImpl} describing the application
    */
-  JobGraph(Config config, OperatorSpecGraph specGraph) {
+  JobGraph(Config config, ApplicationDescriptorImpl appDesc) {
     this.config = config;
-    this.specGraph = specGraph;
+    this.appDesc = appDesc;
+    this.jsonGenerator = new JobGraphJsonGenerator();
+    this.configGenerator = new JobNodeConfigurationGenerator();
   }
 
   @Override
@@ -91,11 +97,6 @@ import org.slf4j.LoggerFactory;
         .collect(Collectors.toList());
   }
 
-  void addTable(TableSpec tableSpec, JobNode node) {
-    tables.add(tableSpec);
-    node.addTable(tableSpec);
-  }
-
   @Override
   public String getPlanAsJson() throws Exception {
     return jsonGenerator.toJson(this);
@@ -105,14 +106,11 @@ import org.slf4j.LoggerFactory;
    * Returns the config for this application
    * @return {@link ApplicationConfig}
    */
+  @Override
   public ApplicationConfig getApplicationConfig() {
     return new ApplicationConfig(config);
   }
 
-  public OperatorSpecGraph getSpecGraph() {
-    return specGraph;
-  }
-
   /**
    * Add a source stream to a {@link JobNode}
    * @param streamSpec input stream
@@ -152,20 +150,20 @@ import org.slf4j.LoggerFactory;
     intermediateStreams.add(edge);
   }
 
+  void addTable(TableSpec tableSpec, JobNode node) {
+    tables.add(tableSpec);
+    node.addTable(tableSpec);
+  }
+
   /**
    * Get the {@link JobNode}. Create one if it does not exist.
    * @param jobName name of the job
    * @param jobId id of the job
-   * @return
+   * @return {@link JobNode} created with {@code jobName} and {@code jobId}
    */
   JobNode getOrCreateJobNode(String jobName, String jobId) {
-    String nodeId = JobNode.createId(jobName, jobId);
-    JobNode node = nodes.get(nodeId);
-    if (node == null) {
-      node = new JobNode(jobName, jobId, specGraph, config);
-      nodes.put(nodeId, node);
-    }
-    return node;
+    String nodeId = JobNode.createJobNameAndId(jobName, jobId);
+    return nodes.computeIfAbsent(nodeId, k -> new JobNode(jobName, jobId, config, appDesc, configGenerator));
   }
 
   /**
@@ -178,20 +176,13 @@ import org.slf4j.LoggerFactory;
   }
 
   /**
-   * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
-   * @param streamSpec  spec of the StreamEdge
-   * @param isIntermediate  boolean flag indicating whether it's an intermediate stream
+   * Get the {@link StreamEdge} for {@code streamId}.
+   *
+   * @param streamId the streamId for the {@link StreamEdge}
    * @return stream edge
    */
-  StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec, boolean isIntermediate) {
-    String streamId = streamSpec.getId();
-    StreamEdge edge = edges.get(streamId);
-    if (edge == null) {
-      boolean isBroadcast = specGraph.getBroadcastStreams().contains(streamId);
-      edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config);
-      edges.put(streamId, edge);
-    }
-    return edge;
+  StreamEdge getStreamEdge(String streamId) {
+    return edges.get(streamId);
   }
 
   /**
@@ -248,6 +239,23 @@ import org.slf4j.LoggerFactory;
   }
 
   /**
+   * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
+   * @param streamSpec  spec of the StreamEdge
+   * @param isIntermediate  boolean flag indicating whether it's an intermediate stream
+   * @return stream edge
+   */
+  private StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec, boolean isIntermediate) {
+    String streamId = streamSpec.getId();
+    StreamEdge edge = edges.get(streamId);
+    if (edge == null) {
+      boolean isBroadcast = appDesc.getBroadcastStreams().contains(streamId);
+      edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config);
+      edges.put(streamId, edge);
+    }
+    return edge;
+  }
+
+  /**
    * Validate the input streams should have indegree being 0 and outdegree greater than 0
    */
   private void validateInputStreams() {
@@ -305,7 +313,7 @@ import org.slf4j.LoggerFactory;
       Set<JobNode> unreachable = new HashSet<>(nodes.values());
       unreachable.removeAll(reachable);
       throw new IllegalArgumentException(String.format("Jobs %s cannot be reached from Sources.",
-          String.join(", ", unreachable.stream().map(JobNode::getId).collect(Collectors.toList()))));
+          String.join(", ", unreachable.stream().map(JobNode::getJobNameAndId).collect(Collectors.toList()))));
     }
   }
 
@@ -325,7 +333,7 @@ import org.slf4j.LoggerFactory;
 
     while (!queue.isEmpty()) {
       JobNode node = queue.poll();
-      node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
+      node.getOutEdges().values().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
           if (!visited.contains(target)) {
             visited.add(target);
             queue.offer(target);
@@ -351,9 +359,9 @@ import org.slf4j.LoggerFactory;
     Map<String, Long> indegree = new HashMap<>();
     Set<JobNode> visited = new HashSet<>();
     pnodes.forEach(node -> {
-        String nid = node.getId();
+        String nid = node.getJobNameAndId();
         //only count the degrees of intermediate streams
-        long degree = node.getInEdges().stream().filter(e -> !inputStreams.contains(e)).count();
+        long degree = node.getInEdges().values().stream().filter(e -> !inputStreams.contains(e)).count();
         indegree.put(nid, degree);
 
         if (degree == 0L) {
@@ -378,8 +386,8 @@ import org.slf4j.LoggerFactory;
       while (!q.isEmpty()) {
         JobNode node = q.poll();
         sortedNodes.add(node);
-        node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
-            String nid = n.getId();
+        node.getOutEdges().values().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
+            String nid = n.getJobNameAndId();
             Long degree = indegree.get(nid) - 1;
             indegree.put(nid, degree);
             if (degree == 0L && !visited.contains(n)) {
@@ -400,7 +408,7 @@ import org.slf4j.LoggerFactory;
           long min = Long.MAX_VALUE;
           JobNode minNode = null;
           for (JobNode node : reachable) {
-            Long degree = indegree.get(node.getId());
+            Long degree = indegree.get(node.getJobNameAndId());
             if (degree < min) {
               min = degree;
               minNode = node;

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 91453d2..18705e4 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -32,7 +32,6 @@ import java.util.stream.Collectors;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
@@ -140,7 +139,7 @@ import org.codehaus.jackson.map.ObjectMapper;
     jobGraph.getTables().forEach(t -> buildTableJson(t, jobGraphJson.tables));
 
     jobGraphJson.jobs = jobGraph.getJobNodes().stream()
-        .map(jobNode -> buildJobNodeJson(jobNode))
+        .map(this::buildJobNodeJson)
         .collect(Collectors.toList());
 
     ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -149,54 +148,12 @@ import org.codehaus.jackson.map.ObjectMapper;
     return new String(out.toByteArray());
   }
 
-  /**
-   * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.operators.StreamGraph} for this job
-   * @param jobNode job node in the {@link JobGraph}
-   * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson}
-   */
-  private JobNodeJson buildJobNodeJson(JobNode jobNode) {
-    JobNodeJson job = new JobNodeJson();
-    job.jobName = jobNode.getJobName();
-    job.jobId = jobNode.getJobId();
-    job.operatorGraph = buildOperatorGraphJson(jobNode);
-    return job;
-  }
-
-  /**
-   * Traverse the {@link OperatorSpec} graph and build the operator graph JSON POJO.
-   * @param jobNode job node in the {@link JobGraph}
-   * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
-   */
-  private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
-    OperatorGraphJson opGraph = new OperatorGraphJson();
-    opGraph.inputStreams = new ArrayList<>();
-    jobNode.getSpecGraph().getInputOperators().forEach((streamId, operatorSpec) -> {
-        StreamJson inputJson = new StreamJson();
-        opGraph.inputStreams.add(inputJson);
-        inputJson.streamId = streamId;
-        inputJson.nextOperatorIds = operatorSpec.getRegisteredOperatorSpecs().stream()
-            .map(OperatorSpec::getOpId).collect(Collectors.toSet());
-
-        updateOperatorGraphJson(operatorSpec, opGraph);
-      });
-
-    opGraph.outputStreams = new ArrayList<>();
-    jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamId -> {
-        StreamJson outputJson = new StreamJson();
-        outputJson.streamId = streamId;
-        opGraph.outputStreams.add(outputJson);
-      });
-    return opGraph;
-  }
-
-  /**
-   * Traverse the {@link OperatorSpec} graph recursively and update the operator graph JSON POJO.
-   * @param operatorSpec input
-   * @param opGraph operator graph to build
-   */
   private void updateOperatorGraphJson(OperatorSpec operatorSpec, OperatorGraphJson opGraph) {
-    // TODO xiliu: render input operators instead of input streams
-    if (operatorSpec.getOpCode() != OpCode.INPUT) {
+    if (operatorSpec == null) {
+      // task application may not have any defined OperatorSpec
+      return;
+    }
+    if (operatorSpec.getOpCode() != OperatorSpec.OpCode.INPUT) {
       opGraph.operators.put(operatorSpec.getOpId(), operatorToMap(operatorSpec));
     }
     Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs();
@@ -243,6 +200,46 @@ import org.codehaus.jackson.map.ObjectMapper;
   }
 
   /**
+   * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.application.ApplicationDescriptorImpl}
+   * for this job
+   *
+   * @param jobNode job node in the {@link JobGraph}
+   * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson}
+   */
+  private JobNodeJson buildJobNodeJson(JobNode jobNode) {
+    JobNodeJson job = new JobNodeJson();
+    job.jobName = jobNode.getJobName();
+    job.jobId = jobNode.getJobId();
+    job.operatorGraph = buildOperatorGraphJson(jobNode);
+    return job;
+  }
+
+  /**
+   * Traverse the {@link OperatorSpec} graph and build the operator graph JSON POJO.
+   * @param jobNode job node in the {@link JobGraph}
+   * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
+   */
+  private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
+    OperatorGraphJson opGraph = new OperatorGraphJson();
+    opGraph.inputStreams = new ArrayList<>();
+    jobNode.getInEdges().values().forEach(inStream -> {
+        StreamJson inputJson = new StreamJson();
+        opGraph.inputStreams.add(inputJson);
+        inputJson.streamId = inStream.getStreamSpec().getId();
+        inputJson.nextOperatorIds = jobNode.getNextOperatorIds(inputJson.streamId);
+        updateOperatorGraphJson(jobNode.getInputOperator(inputJson.streamId), opGraph);
+      });
+
+    opGraph.outputStreams = new ArrayList<>();
+    jobNode.getOutEdges().values().forEach(outStream -> {
+        StreamJson outputJson = new StreamJson();
+        outputJson.streamId = outStream.getStreamSpec().getId();
+        opGraph.outputStreams.add(outputJson);
+      });
+    return opGraph;
+  }
+
+  /**
    * Get or create the JSON POJO for a {@link StreamEdge}
    * @param edge {@link StreamEdge}
    * @param streamEdges map of streamId to {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
@@ -261,15 +258,11 @@ import org.codehaus.jackson.map.ObjectMapper;
       edgeJson.streamSpec = streamSpecJson;
 
       List<String> sourceJobs = new ArrayList<>();
-      edge.getSourceNodes().forEach(jobNode -> {
-          sourceJobs.add(jobNode.getJobName());
-        });
+      edge.getSourceNodes().forEach(jobNode -> sourceJobs.add(jobNode.getJobName()));
       edgeJson.sourceJobs = sourceJobs;
 
       List<String> targetJobs = new ArrayList<>();
-      edge.getTargetNodes().forEach(jobNode -> {
-          targetJobs.add(jobNode.getJobName());
-        });
+      edge.getTargetNodes().forEach(jobNode -> targetJobs.add(jobNode.getJobName()));
       edgeJson.targetJobs = targetJobs;
 
       streamEdges.put(streamId, edgeJson);
@@ -285,12 +278,7 @@ import org.codehaus.jackson.map.ObjectMapper;
    */
   private TableSpecJson buildTableJson(TableSpec tableSpec, Map<String, TableSpecJson> tableSpecs) {
     String tableId = tableSpec.getId();
-    TableSpecJson tableSpecJson = tableSpecs.get(tableId);
-    if (tableSpecJson == null) {
-      tableSpecJson = buildTableJson(tableSpec);
-      tableSpecs.put(tableId, tableSpecJson);
-    }
-    return tableSpecJson;
+    return tableSpecs.computeIfAbsent(tableId, k -> buildTableJson(tableSpec));
   }
 
   /**