You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/09/07 06:36:14 UTC

[8/9] samza git commit: SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for high- and low-level APIs in YARN and standalone environment

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
new file mode 100644
index 0000000..acf4bf7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Util class to help creating {@link ApplicationDescriptorImpl} instance from {@link SamzaApplication} and {@link Config}
+ */
+public class ApplicationDescriptorUtil {
+
+  private ApplicationDescriptorUtil() {
+
+  }
+
+  /**
+   * Create a new instance of {@link ApplicationDescriptorImpl} based on {@link SamzaApplication} and {@link Config}
+   *
+   * @param app an implementation of {@link SamzaApplication}. The {@code app} has to have a proper fully-qualified class name.
+   * @param config the {@link Config} for the application
+   * @return the {@link ApplicationDescriptorImpl} instance containing the processing logic and the config
+   */
+  public static ApplicationDescriptorImpl<? extends ApplicationDescriptor> getAppDescriptor(SamzaApplication app, Config config) {
+    if (app instanceof StreamApplication) {
+      return new StreamApplicationDescriptorImpl((StreamApplication) app, config);
+    }
+    if (app instanceof TaskApplication) {
+      return new TaskApplicationDescriptorImpl((TaskApplication) app, config);
+    }
+    throw new IllegalArgumentException(String.format("User application class %s is not supported. Only StreamApplication "
+        + "and TaskApplication are supported.", app.getClass().getName()));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
new file mode 100644
index 0000000..b39ad3c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.TaskConfig;
+import scala.Option;
+
+
+/**
+ * Util class to create {@link SamzaApplication} from the configuration.
+ */
+public class ApplicationUtil {
+
+  /**
+   * Creates the {@link SamzaApplication} object from the task or application class name specified in {@code config}
+   *
+   * @param config the configuration of the application
+   * @return the {@link SamzaApplication} object
+   */
+  public static SamzaApplication fromConfig(Config config) {
+    String appClassName = new ApplicationConfig(config).getAppClass();
+    if (StringUtils.isNotBlank(appClassName)) {
+      // app.class is configured
+      try {
+        Class<SamzaApplication> appClass = (Class<SamzaApplication>) Class.forName(appClassName);
+        if (StreamApplication.class.isAssignableFrom(appClass) || TaskApplication.class.isAssignableFrom(appClass)) {
+          return appClass.newInstance();
+        }
+      } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
+        throw new ConfigException(String.format("Loading app.class %s failed. The user application has to implement "
+            + "StreamApplication or TaskApplication.", appClassName), e);
+      }
+    }
+    // no app.class defined. It has to be a legacy application with task.class configuration
+    Option<String> taskClassOption = new TaskConfig(config).getTaskClass();
+    if (!taskClassOption.isDefined() || !StringUtils.isNotBlank(taskClassOption.getOrElse(null))) {
+      // no task.class defined either. This is wrong.
+      throw new ConfigException("Legacy task applications must set a non-empty task.class in configuration.");
+    }
+    return new LegacyTaskApplication(taskClassOption.get());
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
new file mode 100644
index 0000000..e9e2635
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import org.apache.samza.task.TaskFactoryUtil;
+
+/**
+ * Default {@link TaskApplication} for legacy applications w/ only task.class implemented
+ */
+public final class LegacyTaskApplication implements TaskApplication {
+  private final String taskClassName;
+
+  public LegacyTaskApplication(String taskClassName) {
+    this.taskClassName = taskClassName;
+  }
+
+  @Override
+  public void describe(TaskApplicationDescriptor appDesc) {
+    appDesc.setTaskFactory(TaskFactoryUtil.getTaskFactory(taskClassName));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
new file mode 100644
index 0000000..ae7a45d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.TableImpl;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.operators.functions.StreamExpander;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class defines:
+ * 1) an implementation of {@link StreamApplicationDescriptor} that provides APIs to access {@link MessageStream}, {@link OutputStream},
+ * and {@link Table} to create the DAG of transforms.
+ * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG
+ */
+public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<StreamApplicationDescriptor>
+    implements StreamApplicationDescriptor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(StreamApplicationDescriptorImpl.class);
+  private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
+
+  private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
+  private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
+  private final Set<String> broadcastStreams = new HashSet<>();
+  private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
+  private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
+  // We use a LHM for deterministic order in initializing and closing operators.
+  private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
+  private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
+  private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
+  private final Set<String> operatorIds = new HashSet<>();
+
+  private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
+
+  /**
+   * The 0-based position of the next operator in the graph.
+   * Part of the unique ID for each OperatorSpec in the graph.
+   * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
+   */
+  private int nextOpNum = 0;
+
+  public StreamApplicationDescriptorImpl(StreamApplication userApp, Config config) {
+    super(userApp, config);
+    userApp.describe(this);
+  }
+
+  @Override
+  public StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
+    Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
+    Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
+        "Default system must be set before creating any input or output streams.");
+    addSystemDescriptor(defaultSystemDescriptor);
+
+    defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
+    return this;
+  }
+
+  @Override
+  public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
+    SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
+    Optional<StreamExpander> expander = systemDescriptor.getExpander();
+    if (expander.isPresent()) {
+      return expander.get().apply(this, inputDescriptor);
+    }
+
+    // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
+    Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
+        String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+    inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
+    addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+
+    String streamId = inputDescriptor.getStreamId();
+    Preconditions.checkState(!inputOperators.containsKey(streamId),
+        "getInputStream must not be called multiple times with the same streamId: " + streamId);
+
+    Serde serde = inputDescriptor.getSerde();
+    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+    if (outputStreams.containsKey(streamId)) {
+      OutputStreamImpl outputStream = outputStreams.get(streamId);
+      Serde keySerde = outputStream.getKeySerde();
+      Serde valueSerde = outputStream.getValueSerde();
+      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+              + "stream level, so the same key and message Serde must be used for both.", streamId));
+    }
+
+    boolean isKeyed = serde instanceof KVSerde;
+    InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
+    InputOperatorSpec inputOperatorSpec =
+        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
+            transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
+    inputOperators.put(streamId, inputOperatorSpec);
+    return new MessageStreamImpl(this, inputOperators.get(streamId));
+  }
+
+  @Override
+  public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
+    Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
+        String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+    outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
+    addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+
+    String streamId = outputDescriptor.getStreamId();
+    Preconditions.checkState(!outputStreams.containsKey(streamId),
+        "getOutputStream must not be called multiple times with the same streamId: " + streamId);
+
+    Serde serde = outputDescriptor.getSerde();
+    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+    if (inputOperators.containsKey(streamId)) {
+      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
+      Serde keySerde = inputOperatorSpec.getKeySerde();
+      Serde valueSerde = inputOperatorSpec.getValueSerde();
+      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+              + "stream level, so the same key and message Serde must be used for both.", streamId));
+    }
+
+    boolean isKeyed = serde instanceof KVSerde;
+    outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    return outputStreams.get(streamId);
+  }
+
+  @Override
+  public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
+    String tableId = tableDescriptor.getTableId();
+    Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
+        String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
+    Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
+        String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
+    tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
+
+    TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec();
+    if (tables.containsKey(tableSpec)) {
+      throw new IllegalStateException(
+          String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
+    }
+    tables.put(tableSpec, new TableImpl(tableSpec));
+    return tables.get(tableSpec);
+  }
+
+  /**
+   * Get all the {@link InputDescriptor}s to this application
+   *
+   * @return an immutable map of streamId to {@link InputDescriptor}
+   */
+  @Override
+  public Map<String, InputDescriptor> getInputDescriptors() {
+    return Collections.unmodifiableMap(inputDescriptors);
+  }
+
+  /**
+   * Get all the {@link OutputDescriptor}s from this application
+   *
+   * @return an immutable map of streamId to {@link OutputDescriptor}
+   */
+  @Override
+  public Map<String, OutputDescriptor> getOutputDescriptors() {
+    return Collections.unmodifiableMap(outputDescriptors);
+  }
+
+  /**
+   * Get all the broadcast streamIds from this application
+   *
+   * @return an immutable set of streamIds
+   */
+  @Override
+  public Set<String> getBroadcastStreams() {
+    return Collections.unmodifiableSet(broadcastStreams);
+  }
+
+  /**
+   * Get all the {@link TableDescriptor}s in this application
+   *
+   * @return an immutable set of {@link TableDescriptor}s
+   */
+  @Override
+  public Set<TableDescriptor> getTableDescriptors() {
+    return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
+  }
+
+  /**
+   * Get all the unique {@link SystemDescriptor}s in this application
+   *
+   * @return an immutable set of {@link SystemDescriptor}s
+   */
+  @Override
+  public Set<SystemDescriptor> getSystemDescriptors() {
+    // We enforce that users must not use different system descriptor instances for the same system name
+    // when getting an input/output stream or setting the default system descriptor
+    return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+  }
+
+  /**
+   * Get the default {@link SystemDescriptor} in this application
+   *
+   * @return the default {@link SystemDescriptor}
+   */
+  @Override
+  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
+    return defaultSystemDescriptorOptional;
+  }
+
+  public OperatorSpecGraph getOperatorSpecGraph() {
+    return new OperatorSpecGraph(this);
+  }
+
+  /**
+   * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+   * jobName-jobId-opCode-(userDefinedId|nextOpNum);
+   *
+   * @param opCode the {@link OpCode} of the next operator
+   * @param userDefinedId the optional user-provided name of the next operator or null
+   * @return the unique ID for the next operator in the graph
+   */
+  public String getNextOpId(OpCode opCode, String userDefinedId) {
+    if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) {
+      throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
+    }
+
+    String nextOpId = String.format("%s-%s-%s-%s",
+        config.get(JobConfig.JOB_NAME()),
+        config.get(JobConfig.JOB_ID(), "1"),
+        opCode.name().toLowerCase(),
+        StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
+    if (!operatorIds.add(nextOpId)) {
+      throw new SamzaException(
+          String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
+    }
+    nextOpNum++;
+    return nextOpId;
+  }
+
+  /**
+   * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+   * jobName-jobId-opCode-nextOpNum;
+   *
+   * @param opCode the {@link OpCode} of the next operator
+   * @return the unique ID for the next operator in the graph
+   */
+  public String getNextOpId(OpCode opCode) {
+    return getNextOpId(opCode, null);
+  }
+
+  public Map<String, InputOperatorSpec> getInputOperators() {
+    return Collections.unmodifiableMap(inputOperators);
+  }
+
+  public Map<String, OutputStreamImpl> getOutputStreams() {
+    return Collections.unmodifiableMap(outputStreams);
+  }
+
+  public Map<TableSpec, TableImpl> getTables() {
+    return Collections.unmodifiableMap(tables);
+  }
+
+  /**
+   * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
+   * An intermediate {@link MessageStream} is both an output and an input stream.
+   *
+   * @param streamId the id of the stream to be created.
+   * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
+   *              is used.
+   * @param isBroadcast whether the stream is a broadcast stream.
+   * @param <M> the type of messages in the intermediate {@link MessageStream}
+   * @return  the intermediate {@link MessageStreamImpl}
+   */
+  @VisibleForTesting
+  public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) {
+    Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId),
+        "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
+
+    if (serde == null) {
+      LOGGER.info("No serde provided for intermediate stream: " + streamId +
+          ". Key and message serdes configured for the job.default.system will be used.");
+    }
+
+    if (isBroadcast) {
+      broadcastStreams.add(streamId);
+    }
+
+    boolean isKeyed;
+    KV<Serde, Serde> kvSerdes;
+    if (serde == null) { // if no explicit serde available
+      isKeyed = true; // assume keyed stream
+      kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs
+    } else {
+      isKeyed = serde instanceof KVSerde;
+      kvSerdes = getKVSerdes(streamId, serde);
+    }
+
+    InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor()
+        .flatMap(SystemDescriptor::getTransformer).orElse(null);
+
+    InputOperatorSpec inputOperatorSpec =
+        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
+            transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
+    inputOperators.put(streamId, inputOperatorSpec);
+    outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
+  }
+
+  private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
+    Serde keySerde, valueSerde;
+
+    if (serde instanceof KVSerde) {
+      keySerde = ((KVSerde) serde).getKeySerde();
+      valueSerde = ((KVSerde) serde).getValueSerde();
+    } else {
+      keySerde = new NoOpSerde();
+      valueSerde = serde;
+    }
+
+    if (keySerde instanceof NoOpSerde) {
+      LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
+          ". Keys will not be (de)serialized");
+    }
+    if (valueSerde instanceof NoOpSerde) {
+      LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
+          ". Values will not be (de)serialized");
+    }
+
+    return KV.of(keySerde, valueSerde);
+  }
+
+  // check uniqueness of the {@code systemDescriptor} and add if it is unique
+  private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
+    Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
+            || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
+        "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
+    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
new file mode 100644
index 0000000..3597d7c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.task.TaskFactory;
+
+
+/**
+ * This class implements interface {@link TaskApplicationDescriptor}.
+ * <p>
+ * In addition to the common objects for an application defined in {@link ApplicationDescriptorImpl}, this class also includes
+ * the low-level {@link TaskFactory} that creates user-defined task instances, the lists of input/broadcast/output streams,
+ * and the list of {@link TableDescriptor}s used in the application.
+ */
+public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<TaskApplicationDescriptor>
+    implements TaskApplicationDescriptor {
+
+  private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
+  private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
+  private final Set<String> broadcastStreams = new HashSet<>();
+  private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
+  private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
+
+  private TaskFactory taskFactory = null;
+
+  public TaskApplicationDescriptorImpl(TaskApplication userApp, Config config) {
+    super(userApp, config);
+    userApp.describe(this);
+  }
+
+  @Override
+  public void setTaskFactory(TaskFactory factory) {
+    this.taskFactory = factory;
+  }
+
+  @Override
+  public void addInputStream(InputDescriptor inputDescriptor) {
+    // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
+    Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
+        String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+    inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
+    addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+  }
+
+  @Override
+  public void addOutputStream(OutputDescriptor outputDescriptor) {
+    Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
+        String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+    outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
+    addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+  }
+
+  @Override
+  public void addTable(TableDescriptor tableDescriptor) {
+    Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
+        String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
+    tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
+  }
+
+  @Override
+  public Map<String, InputDescriptor> getInputDescriptors() {
+    return Collections.unmodifiableMap(inputDescriptors);
+  }
+
+  @Override
+  public Map<String, OutputDescriptor> getOutputDescriptors() {
+    return Collections.unmodifiableMap(outputDescriptors);
+  }
+
+  @Override
+  public Set<String> getBroadcastStreams() {
+    return Collections.unmodifiableSet(broadcastStreams);
+  }
+
+  @Override
+  public Set<TableDescriptor> getTableDescriptors() {
+    return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
+  }
+
+  @Override
+  public Set<SystemDescriptor> getSystemDescriptors() {
+    // We enforce that users must not use different system descriptor instances for the same system name
+    // when getting an input/output stream or setting the default system descriptor
+    return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+  }
+
+  /**
+   * Get the user-defined {@link TaskFactory}
+   * @return the {@link TaskFactory} object
+   */
+  public TaskFactory getTaskFactory() {
+    return taskFactory;
+  }
+
+  private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
+    Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
+            || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
+        "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
+    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
index fe8bc66..5b3636b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
@@ -19,34 +19,40 @@
 package org.apache.samza.container;
 
 /**
- * A Listener for {@link org.apache.samza.container.SamzaContainer} lifecycle events.
+ * A Listener for {@link SamzaContainer} lifecycle events.
  */
 public interface SamzaContainerListener {
 
   /**
-   *  Method invoked when the {@link org.apache.samza.container.SamzaContainer} has successfully transitioned to
+   * Method invoked when the {@link SamzaContainer} state is {@link org.apache.samza.SamzaContainerStatus#NOT_STARTED}
+   * and is about to transition to {@link org.apache.samza.SamzaContainerStatus#STARTING} to start the initialization sequence.
+   */
+  void beforeStart();
+
+  /**
+   *  Method invoked after the {@link SamzaContainer} has successfully transitioned to
    *  the {@link org.apache.samza.SamzaContainerStatus#STARTED} state and is about to start the
    *  {@link org.apache.samza.container.RunLoop}
    */
-  void onContainerStart();
+  void afterStart();
 
   /**
-   *  Method invoked when the {@link org.apache.samza.container.SamzaContainer} has successfully transitioned to
+   *  Method invoked after the {@link SamzaContainer} has successfully transitioned to
    *  {@link org.apache.samza.SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in
    *  {@link org.apache.samza.SamzaContainerStatus}
    *  <br>
    *  <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any
    *  exceptions/errors.
    */
-  void onContainerStop();
+  void afterStop();
 
   /**
-   *  Method invoked when the {@link org.apache.samza.container.SamzaContainer} has  transitioned to
+   *  Method invoked after the {@link SamzaContainer} has  transitioned to
    *  {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
    *  {@link org.apache.samza.SamzaContainerStatus}
    *  <br>
-   *  <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop()}.
+   *  <b>Note</b>: {@link #afterFailure(Throwable)} is mutually exclusive to {@link #afterStop()}.
    *  @param t Throwable that caused the container failure.
    */
-  void onContainerFailed(Throwable t);
+  void afterFailure(Throwable t);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index e95ed26..ea892fe 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -47,9 +47,10 @@ import static org.apache.samza.util.StreamUtil.*;
 
 
 /**
- * The ExecutionPlanner creates the physical execution graph for the StreamGraph, and
+ * The ExecutionPlanner creates the physical execution graph for the {@link OperatorSpecGraph}, and
  * the intermediate topics needed for the execution.
  */
+// TODO: ExecutionPlanner needs to be able to generate single node JobGraph for low-level TaskApplication as well (SAMZA-1811)
 public class ExecutionPlanner {
   private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
 
@@ -63,7 +64,7 @@ public class ExecutionPlanner {
     this.streamManager = streamManager;
   }
 
-  public ExecutionPlan plan(OperatorSpecGraph specGraph) throws Exception {
+  public ExecutionPlan plan(OperatorSpecGraph specGraph) {
     validateConfig();
 
     // create physical job graph based on stream graph
@@ -91,7 +92,7 @@ public class ExecutionPlanner {
   }
 
   /**
-   * Create the physical graph from StreamGraph
+   * Create the physical graph from {@link OperatorSpecGraph}
    */
   /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) {
     JobGraph jobGraph = new JobGraph(config, specGraph);

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 2f210f2..f49e6db 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -195,12 +195,6 @@ import org.slf4j.LoggerFactory;
   }
 
   /**
-   * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
-   * @param streamSpec spec of the StreamEdge
-   * @return stream edge
-   */
-
-  /**
    * Returns the job nodes to be executed in the topological order
    * @return unmodifiable list of {@link JobNode}
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
new file mode 100644
index 0000000..a2050e5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.TaskApplicationDescriptorImpl;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.table.TableConfigGenerator;
+import org.apache.samza.table.TableSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a temporary helper class to include all common logic to generate {@link JobConfig}s for high- and low-level
+ * applications in {@link org.apache.samza.runtime.LocalApplicationRunner} and {@link org.apache.samza.runtime.RemoteApplicationRunner}.
+ *
+ * TODO: Fix SAMZA-1811 to consolidate this class with {@link ExecutionPlanner}
+ */
+public abstract class JobPlanner {
+  private static final Logger LOG = LoggerFactory.getLogger(JobPlanner.class);
+
+  protected final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
+  protected final Config config;
+
+  JobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
+    this.appDesc = descriptor;
+    this.config = descriptor.getConfig();
+  }
+
+  public List<JobConfig> prepareJobs() {
+    String appId = new ApplicationConfig(appDesc.getConfig()).getGlobalAppId();
+    if (appDesc instanceof TaskApplicationDescriptorImpl) {
+      return Collections.singletonList(prepareTaskJob((TaskApplicationDescriptorImpl) appDesc));
+    } else if (appDesc instanceof StreamApplicationDescriptorImpl) {
+      try {
+        return prepareStreamJobs((StreamApplicationDescriptorImpl) appDesc);
+      } catch (Exception e) {
+        throw new SamzaException("Failed to generate JobConfig for StreamApplication " + appId, e);
+      }
+    }
+    throw new IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be either TaskApplicationDescriptorImpl or "
+        + "StreamApplicationDescriptorImpl. class %s is not supported", appDesc.getClass().getName()));
+  }
+
+  abstract List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception;
+
+  StreamManager buildAndStartStreamManager(Config config) {
+    StreamManager streamManager = new StreamManager(config);
+    streamManager.start();
+    return streamManager;
+  }
+
+  ExecutionPlan getExecutionPlan(OperatorSpecGraph specGraph) {
+    return getExecutionPlan(specGraph, null);
+  }
+
+  /* package private */
+  ExecutionPlan getExecutionPlan(OperatorSpecGraph specGraph, String runId) {
+
+    // update application configs
+    Map<String, String> cfg = new HashMap<>();
+    if (StringUtils.isNoneEmpty(runId)) {
+      cfg.put(ApplicationConfig.APP_RUN_ID, runId);
+    }
+
+    StreamConfig streamConfig = new StreamConfig(config);
+    Set<String> inputStreams = new HashSet<>(specGraph.getInputOperators().keySet());
+    inputStreams.removeAll(specGraph.getOutputStreams().keySet());
+    ApplicationConfig.ApplicationMode mode = inputStreams.stream().allMatch(streamConfig::getIsBounded)
+        ? ApplicationConfig.ApplicationMode.BATCH : ApplicationConfig.ApplicationMode.STREAM;
+    cfg.put(ApplicationConfig.APP_MODE, mode.name());
+
+    // merge user-provided configuration with input/output descriptor generated configuration
+    // descriptor generated configuration has higher priority
+    Map<String, String> systemStreamConfigs = expandSystemStreamConfigs(appDesc);
+    cfg.putAll(systemStreamConfigs);
+
+    // adding app.class in the configuration
+    cfg.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());
+
+    // create the physical execution plan and merge with overrides. This works for a single-stage job now
+    // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
+    Config mergedConfig = JobNode.mergeJobConfig(config, new MapConfig(cfg));
+    // creating the StreamManager to get all input/output streams' metadata for planning
+    StreamManager streamManager = buildAndStartStreamManager(mergedConfig);
+    try {
+      ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, streamManager);
+      return planner.plan(specGraph);
+    } finally {
+      streamManager.stop();
+    }
+  }
+
+  /**
+   * Write the execution plan JSON to a file
+   * @param planJson JSON representation of the plan
+   */
+  final void writePlanJsonFile(String planJson) {
+    try {
+      String content = "plan='" + planJson + "'";
+      String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR());
+      if (planPath != null && !planPath.isEmpty()) {
+        // Write the plan json to plan path
+        File file = new File(planPath + "/plan.json");
+        file.setReadable(true, false);
+        PrintWriter writer = new PrintWriter(file, "UTF-8");
+        writer.println(content);
+        writer.close();
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to write execution plan json to file", e);
+    }
+  }
+
+  // TODO: SAMZA-1814: the following configuration generation still misses serde configuration generation,
+  // side input configuration, broadcast input and task inputs configuration generation for low-level task
+  // applications
+  // helper method to generate a single node job configuration for low level task applications
+  private JobConfig prepareTaskJob(TaskApplicationDescriptorImpl taskAppDesc) {
+    // copy original configure
+    Map<String, String> cfg = new HashMap<>();
+    // expand system and streams configure
+    Map<String, String> systemStreamConfigs = expandSystemStreamConfigs(taskAppDesc);
+    cfg.putAll(systemStreamConfigs);
+    // expand table configure
+    cfg.putAll(expandTableConfigs(cfg, taskAppDesc));
+    // adding app.class in the configuration
+    cfg.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());
+    // create the physical execution plan and merge with overrides. This works for a single-stage job now
+    // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
+    return new JobConfig(JobNode.mergeJobConfig(config, new MapConfig(cfg)));
+  }
+
+  private Map<String, String> expandSystemStreamConfigs(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+    Map<String, String> systemStreamConfigs = new HashMap<>();
+    appDesc.getInputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig()));
+    appDesc.getOutputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig()));
+    appDesc.getSystemDescriptors().forEach(sd -> systemStreamConfigs.putAll(sd.toConfig()));
+    appDesc.getDefaultSystemDescriptor().ifPresent(dsd ->
+        systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(), dsd.getSystemName()));
+    return systemStreamConfigs;
+  }
+
+  private Map<String, String> expandTableConfigs(Map<String, String> originConfig,
+      ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+    List<TableSpec> tableSpecs = new ArrayList<>();
+    appDesc.getTableDescriptors().stream().map(td -> ((BaseTableDescriptor) td).getTableSpec())
+        .forEach(spec -> tableSpecs.add(spec));
+    return TableConfigGenerator.generateConfigsForTableSpecs(new MapConfig(originConfig), tableSpecs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
new file mode 100644
index 0000000..7996d6b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Temporarily helper class with specific implementation of {@link JobPlanner#prepareStreamJobs(StreamApplicationDescriptorImpl)}
+ * for standalone Samza processors.
+ *
+ * TODO: we need to consolidate this with {@link ExecutionPlanner} after SAMZA-1811.
+ */
+public class LocalJobPlanner extends JobPlanner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalJobPlanner.class);
+  private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData";
+
+  private final String uid = UUID.randomUUID().toString();;
+
+  public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
+    super(descriptor);
+  }
+
+  @Override
+  List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception {
+    // for high-level DAG, generating the plan and job configs
+    // 1. initialize and plan
+    ExecutionPlan plan = getExecutionPlan(streamAppDesc.getOperatorSpecGraph());
+
+    String executionPlanJson = plan.getPlanAsJson();
+    writePlanJsonFile(executionPlanJson);
+    LOG.info("Execution Plan: \n" + executionPlanJson);
+    String planId = String.valueOf(executionPlanJson.hashCode());
+
+    if (plan.getJobConfigs().isEmpty()) {
+      throw new SamzaException("No jobs in the plan.");
+    }
+
+    // 2. create the necessary streams
+    // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391
+    // TODO: this works for single-job applications. For multi-job applications, ExecutionPlan should return an AppConfig
+    // to be used for the whole application
+    JobConfig jobConfig = plan.getJobConfigs().get(0);
+    StreamManager streamManager = null;
+    try {
+      // create the StreamManager to create intermediate streams in the plan
+      streamManager = buildAndStartStreamManager(jobConfig);
+      createStreams(planId, plan.getIntermediateStreams(), streamManager);
+    } finally {
+      if (streamManager != null) {
+        streamManager.stop();
+      }
+    }
+    return plan.getJobConfigs();
+  }
+
+  /**
+   * Create intermediate streams using {@link StreamManager}.
+   * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader
+   * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes
+   * stream creation.
+   * @param planId a unique identifier representing the plan used for coordination purpose
+   * @param intStreams list of intermediate {@link StreamSpec}s
+   * @param streamManager the {@link StreamManager} used to create streams
+   */
+  private void createStreams(String planId, List<StreamSpec> intStreams, StreamManager streamManager) {
+    if (intStreams.isEmpty()) {
+      LOG.info("Set of intermediate streams is empty. Nothing to create.");
+      return;
+    }
+    LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid);
+    // Move the scope of coordination utils within stream creation to address long idle connection problem.
+    // Refer SAMZA-1385 for more details
+    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+    String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
+    CoordinationUtils coordinationUtils =
+        jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, config);
+    if (coordinationUtils == null) {
+      LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
+      // each application process will try creating the streams, which
+      // requires stream creation to be idempotent
+      streamManager.createStreams(intStreams);
+      return;
+    }
+
+    DistributedLockWithState lockWithState = coordinationUtils.getLockWithState(planId);
+    try {
+      // check if the processor needs to go through leader election and stream creation
+      if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) {
+        LOG.info("lock acquired for streams creation by " + uid);
+        streamManager.createStreams(intStreams);
+        lockWithState.unlockAndSet();
+      } else {
+        LOG.info("Processor {} did not obtain the lock for streams creation. They must've been created by another processor.", uid);
+      }
+    } catch (TimeoutException e) {
+      String msg = String.format("Processor {} failed to get the lock for stream initialization", uid);
+      throw new SamzaException(msg, e);
+    } finally {
+      coordinationUtils.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
new file mode 100644
index 0000000..254ff97
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Temporary helper class with specific implementation of {@link JobPlanner#prepareStreamJobs(StreamApplicationDescriptorImpl)}
+ * for remote-launched Samza processors (e.g. in YARN).
+ *
+ * TODO: we need to consolidate this class with {@link ExecutionPlanner} after SAMZA-1811.
+ */
+public class RemoteJobPlanner extends JobPlanner {
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteJobPlanner.class);
+
+  public RemoteJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
+    super(descriptor);
+  }
+
+  @Override
+  List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception {
+    // for high-level DAG, generate the plan and job configs
+    // TODO: run.id needs to be set for standalone: SAMZA-1531
+    // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision
+    String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
+    LOG.info("The run id for this run is {}", runId);
+
+    // 1. initialize and plan
+    ExecutionPlan plan = getExecutionPlan(streamAppDesc.getOperatorSpecGraph(), runId);
+    writePlanJsonFile(plan.getPlanAsJson());
+
+    if (plan.getJobConfigs().isEmpty()) {
+      throw new SamzaException("No jobs in the plan.");
+    }
+
+    // 2. create the necessary streams
+    // TODO: this works for single-job applications. For multi-job applications, ExecutionPlan should return an AppConfig
+    // to be used for the whole application
+    JobConfig jobConfig = plan.getJobConfigs().get(0);
+    StreamManager streamManager = null;
+    try {
+      // create the StreamManager to create intermediate streams in the plan
+      streamManager = buildAndStartStreamManager(jobConfig);
+      if (plan.getApplicationConfig().getAppMode() == ApplicationConfig.ApplicationMode.BATCH) {
+        streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun());
+      }
+      streamManager.createStreams(plan.getIntermediateStreams());
+    } finally {
+      if (streamManager != null) {
+        streamManager.stop();
+      }
+    }
+    return plan.getJobConfigs();
+  }
+
+  private Config getConfigFromPrevRun() {
+    CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
+    consumer.register();
+    consumer.start();
+    consumer.bootstrap();
+    consumer.stop();
+
+    Config cfg = consumer.getConfig();
+    LOG.info("Previous config is: " + cfg.toString());
+    return cfg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 3c1a1dc..5411af3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -19,10 +19,12 @@
 
 package org.apache.samza.operators;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
 import java.util.Collection;
 
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
@@ -53,7 +55,7 @@ import org.apache.samza.table.TableSpec;
 
 /**
  * The {@link MessageStream} implementation that lets users describe their logical DAG.
- * Users can obtain an instance by calling {@link StreamGraph#getInputStream}.
+ * Users can obtain an instance by calling {@link StreamApplicationDescriptorImpl#getInputStream}.
  * <p>
  * Each {@link MessageStreamImpl} is associated with a single {@link OperatorSpec} in the DAG and allows
  * users to chain further operators on its {@link OperatorSpec}. In other words, a {@link MessageStreamImpl}
@@ -63,54 +65,54 @@ import org.apache.samza.table.TableSpec;
  */
 public class MessageStreamImpl<M> implements MessageStream<M> {
   /**
-   * The {@link StreamGraphSpec} that contains this {@link MessageStreamImpl}
+   * The {@link StreamApplicationDescriptorImpl} that contains this {@link MessageStreamImpl}
    */
-  private final StreamGraphSpec graph;
+  private final StreamApplicationDescriptorImpl streamAppDesc;
 
   /**
    * The {@link OperatorSpec} associated with this {@link MessageStreamImpl}
    */
   private final OperatorSpec operatorSpec;
 
-  public MessageStreamImpl(StreamGraphSpec graph, OperatorSpec<?, M> operatorSpec) {
-    this.graph = graph;
+  public MessageStreamImpl(StreamApplicationDescriptorImpl streamAppDesc, OperatorSpec<?, M> operatorSpec) {
+    this.streamAppDesc = streamAppDesc;
     this.operatorSpec = operatorSpec;
   }
 
   @Override
   public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) {
-    String opId = this.graph.getNextOpId(OpCode.MAP);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.MAP);
     StreamOperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
-    return new MessageStreamImpl<>(this.graph, op);
+    return new MessageStreamImpl<>(this.streamAppDesc, op);
   }
 
   @Override
   public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
-    String opId = this.graph.getNextOpId(OpCode.FILTER);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.FILTER);
     StreamOperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
-    return new MessageStreamImpl<>(this.graph, op);
+    return new MessageStreamImpl<>(this.streamAppDesc, op);
   }
 
   @Override
   public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
-    String opId = this.graph.getNextOpId(OpCode.FLAT_MAP);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.FLAT_MAP);
     StreamOperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
-    return new MessageStreamImpl<>(this.graph, op);
+    return new MessageStreamImpl<>(this.streamAppDesc, op);
   }
 
   @Override
   public void sink(SinkFunction<? super M> sinkFn) {
-    String opId = this.graph.getNextOpId(OpCode.SINK);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.SINK);
     SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
   }
 
   @Override
   public void sendTo(OutputStream<M> outputStream) {
-    String opId = this.graph.getNextOpId(OpCode.SEND_TO);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
     OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
         (OutputStreamImpl<M>) outputStream, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
@@ -118,10 +120,10 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
 
   @Override
   public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String userDefinedId) {
-    String opId = this.graph.getNextOpId(OpCode.WINDOW, userDefinedId);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.WINDOW, userDefinedId);
     OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
-    return new MessageStreamImpl<>(this.graph, op);
+    return new MessageStreamImpl<>(this.streamAppDesc, op);
   }
 
   @Override
@@ -130,7 +132,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
       Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
       Duration ttl, String userDefinedId) {
     if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself.");
-    String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.JOIN, userDefinedId);
     OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
     JoinOperatorSpec<K, M, OM, JM> op =
         OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn, keySerde,
@@ -138,35 +140,35 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
     this.operatorSpec.registerNextOperatorSpec(op);
     otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) op);
 
-    return new MessageStreamImpl<>(this.graph, op);
+    return new MessageStreamImpl<>(this.streamAppDesc, op);
   }
 
   @Override
   public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
       StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn) {
-    String opId = this.graph.getNextOpId(OpCode.JOIN);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.JOIN);
     TableSpec tableSpec = ((TableImpl) table).getTableSpec();
     StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec(
         tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId);
     this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
-    return new MessageStreamImpl<>(this.graph, joinOpSpec);
+    return new MessageStreamImpl<>(this.streamAppDesc, joinOpSpec);
   }
 
   @Override
   public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
     if (otherStreams.isEmpty()) return this;
-    String opId = this.graph.getNextOpId(OpCode.MERGE);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.MERGE);
     StreamOperatorSpec<M, M> op = OperatorSpecs.createMergeOperatorSpec(opId);
     this.operatorSpec.registerNextOperatorSpec(op);
     otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(op));
-    return new MessageStreamImpl<>(this.graph, op);
+    return new MessageStreamImpl<>(this.streamAppDesc, op);
   }
 
   @Override
   public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
       MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) {
-    String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId);
-    IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde, false);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.PARTITION_BY, userDefinedId);
+    IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.streamAppDesc.getIntermediateStream(opId, serde, false);
     if (!intermediateStream.isKeyed()) {
       // this can only happen when the default serde partitionBy variant is being used
       throw new SamzaException("partitionBy can not be used with a default serde that is not a KVSerde.");
@@ -185,7 +187,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
 
   @Override
   public <K, V> void sendTo(Table<KV<K, V>> table) {
-    String opId = this.graph.getNextOpId(OpCode.SEND_TO);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
     SendToTableOperatorSpec<K, V> op =
         OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableSpec(), opId);
     this.operatorSpec.registerNextOperatorSpec(op);
@@ -193,8 +195,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
 
   @Override
   public MessageStream<M> broadcast(Serde<M> serde, String userDefinedId) {
-    String opId = this.graph.getNextOpId(OpCode.BROADCAST, userDefinedId);
-    IntermediateMessageStreamImpl<M> intermediateStream = this.graph.getIntermediateStream(opId, serde, true);
+    String opId = this.streamAppDesc.getNextOpId(OpCode.BROADCAST, userDefinedId);
+    IntermediateMessageStreamImpl<M> intermediateStream = this.streamAppDesc.getIntermediateStream(opId, serde, true);
     BroadcastOperatorSpec<M> broadcastOperatorSpec =
         OperatorSpecs.createBroadCastOperatorSpec(intermediateStream.getOutputStream(), opId);
     this.operatorSpec.registerNextOperatorSpec(broadcastOperatorSpec);
@@ -210,7 +212,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
    * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
    * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
    */
-  protected OperatorSpec<?, M> getOperatorSpec() {
+  @VisibleForTesting
+  public OperatorSpec<?, M> getOperatorSpec() {
     return this.operatorSpec;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
index b6c3dae..b75b1e8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
@@ -33,10 +34,11 @@ import org.apache.samza.table.TableSpec;
 
 
 /**
- * Defines the serialized format of {@link StreamGraphSpec}. This class encapsulates all getter methods to get the {@link OperatorSpec}
- * initialized in the {@link StreamGraphSpec} and constructsthe corresponding serialized instances of {@link OperatorSpec}.
- * The {@link StreamGraphSpec} and {@link OperatorSpec} instances included in this class are considered as immutable and read-only.
- * The instance of {@link OperatorSpecGraph} should only be used in runtime to construct {@link org.apache.samza.task.StreamOperatorTask}.
+ * Defines the serialized format of the operator graph in {@link StreamApplicationDescriptorImpl}. This class encapsulates all
+ * getter methods to get the {@link OperatorSpec} initialized in the {@link StreamApplicationDescriptorImpl} and constructs the
+ * corresponding serialized instances of {@link OperatorSpec}. The {@link StreamApplicationDescriptorImpl} and {@link OperatorSpec}
+ * instances included in this class are considered as immutable and read-only. The instance of {@link OperatorSpecGraph}
+ * should only be used in runtime to construct {@link org.apache.samza.task.StreamOperatorTask}.
  */
 public class OperatorSpecGraph implements Serializable {
   // We use a LHM for deterministic order in initializing and closing operators.
@@ -51,11 +53,11 @@ public class OperatorSpecGraph implements Serializable {
   private transient final SerializableSerde<OperatorSpecGraph> opSpecGraphSerde = new SerializableSerde<>();
   private transient final byte[] serializedOpSpecGraph;
 
-  OperatorSpecGraph(StreamGraphSpec graphSpec) {
-    this.inputOperators = graphSpec.getInputOperators();
-    this.outputStreams = graphSpec.getOutputStreams();
-    this.broadcastStreams = graphSpec.getBroadcastStreams();
-    this.tables = graphSpec.getTables();
+  public OperatorSpecGraph(StreamApplicationDescriptorImpl streamAppDesc) {
+    this.inputOperators = streamAppDesc.getInputOperators();
+    this.outputStreams = streamAppDesc.getOutputStreams();
+    this.broadcastStreams = streamAppDesc.getBroadcastStreams();
+    this.tables = streamAppDesc.getTables();
     this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs());
     this.hasWindowOrJoins = checkWindowOrJoins();
     this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this);
@@ -78,7 +80,7 @@ public class OperatorSpecGraph implements Serializable {
   }
 
   /**
-   * Get all {@link OperatorSpec}s available in this {@link StreamGraphSpec}
+   * Get all {@link OperatorSpec}s available in this {@link StreamApplicationDescriptorImpl}
    *
    * @return all available {@link OperatorSpec}s
    */
@@ -87,9 +89,9 @@ public class OperatorSpecGraph implements Serializable {
   }
 
   /**
-   * Returns <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator
+   * Returns <tt>true</tt> iff this {@link StreamApplicationDescriptorImpl} contains a join or a window operator
    *
-   * @return  <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator
+   * @return  <tt>true</tt> iff this {@link StreamApplicationDescriptorImpl} contains a join or a window operator
    */
   public boolean hasWindowOrJoins() {
     return hasWindowOrJoins;

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
deleted file mode 100644
index 8eb2425..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.StreamExpander;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OperatorSpecs;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class defines:
- * 1) an implementation of {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to
- * create the DAG of transforms.
- * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG
- */
-public class StreamGraphSpec implements StreamGraph {
-  private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphSpec.class);
-  private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
-
-  // We use a LHM for deterministic order in initializing and closing operators.
-  private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
-  private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
-  private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
-  private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
-  private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
-  private final Set<String> broadcastStreams = new HashSet<>();
-  private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
-  private final Config config;
-
-  /**
-   * The 0-based position of the next operator in the graph.
-   * Part of the unique ID for each OperatorSpec in the graph.
-   * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
-   */
-  private int nextOpNum = 0;
-  private final Set<String> operatorIds = new HashSet<>();
-  private ContextManager contextManager = null;
-  private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
-
-  public StreamGraphSpec(Config config) {
-    this.config = config;
-  }
-
-  @Override
-  public void setDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
-    Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
-    String defaultSystemName = defaultSystemDescriptor.getSystemName();
-    Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
-        "Default system must be set before creating any input or output streams.");
-    checkSystemDescriptorUniqueness(defaultSystemDescriptor, defaultSystemName);
-    systemDescriptors.put(defaultSystemName, defaultSystemDescriptor);
-    this.defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
-  }
-
-  @Override
-  public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
-    SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
-    Optional<StreamExpander> expander = systemDescriptor.getExpander();
-    if (expander.isPresent()) {
-      return expander.get().apply(this, inputDescriptor);
-    }
-
-    String streamId = inputDescriptor.getStreamId();
-    Preconditions.checkState(!inputOperators.containsKey(streamId),
-        "getInputStream must not be called multiple times with the same streamId: " + streamId);
-    Preconditions.checkState(!inputDescriptors.containsKey(streamId),
-        "getInputStream must not be called multiple times with the same input descriptor: " + streamId);
-    String systemName = systemDescriptor.getSystemName();
-    checkSystemDescriptorUniqueness(systemDescriptor, systemName);
-
-    Serde serde = inputDescriptor.getSerde();
-    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    if (outputStreams.containsKey(streamId)) {
-      OutputStreamImpl outputStream = outputStreams.get(streamId);
-      Serde keySerde = outputStream.getKeySerde();
-      Serde valueSerde = outputStream.getValueSerde();
-      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
-          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
-              + "stream level, so the same key and message Serde must be used for both.", streamId));
-    }
-
-    boolean isKeyed = serde instanceof KVSerde;
-    InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
-    InputOperatorSpec inputOperatorSpec =
-        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
-            transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
-    inputOperators.put(streamId, inputOperatorSpec);
-    inputDescriptors.put(streamId, inputDescriptor);
-    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
-    return new MessageStreamImpl(this, inputOperators.get(streamId));
-  }
-
-  @Override
-  public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
-    String streamId = outputDescriptor.getStreamId();
-    Preconditions.checkState(!outputStreams.containsKey(streamId),
-        "getOutputStream must not be called multiple times with the same streamId: " + streamId);
-    Preconditions.checkState(!outputDescriptors.containsKey(streamId),
-        "getOutputStream must not be called multiple times with the same output descriptor: " + streamId);
-    SystemDescriptor systemDescriptor = outputDescriptor.getSystemDescriptor();
-    String systemName = systemDescriptor.getSystemName();
-    checkSystemDescriptorUniqueness(systemDescriptor, systemName);
-
-    Serde serde = outputDescriptor.getSerde();
-    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    if (inputOperators.containsKey(streamId)) {
-      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
-      Serde keySerde = inputOperatorSpec.getKeySerde();
-      Serde valueSerde = inputOperatorSpec.getValueSerde();
-      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
-          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
-              + "stream level, so the same key and message Serde must be used for both.", streamId));
-    }
-
-    boolean isKeyed = serde instanceof KVSerde;
-    outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
-    outputDescriptors.put(streamId, outputDescriptor);
-    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
-    return outputStreams.get(streamId);
-  }
-
-  @Override
-  public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
-    String tableId = tableDescriptor.getTableId();
-    Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
-        String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
-    TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec();
-    if (tables.containsKey(tableSpec)) {
-      throw new IllegalStateException(
-          String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
-    }
-    tables.put(tableSpec, new TableImpl(tableSpec));
-    return tables.get(tableSpec);
-  }
-
-  @Override
-  public StreamGraph withContextManager(ContextManager contextManager) {
-    this.contextManager = contextManager;
-    return this;
-  }
-
-  public ContextManager getContextManager() {
-    return this.contextManager;
-  }
-
-  public OperatorSpecGraph getOperatorSpecGraph() {
-    return new OperatorSpecGraph(this);
-  }
-
-  /**
-   * Gets the unique ID for the next operator in the graph. The ID is of the following format:
-   * jobName-jobId-opCode-(userDefinedId|nextOpNum);
-   *
-   * @param opCode the {@link OpCode} of the next operator
-   * @param userDefinedId the optional user-provided name of the next operator or null
-   * @return the unique ID for the next operator in the graph
-   */
-  public String getNextOpId(OpCode opCode, String userDefinedId) {
-    if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) {
-      throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
-    }
-
-    String nextOpId = String.format("%s-%s-%s-%s",
-        config.get(JobConfig.JOB_NAME()),
-        config.get(JobConfig.JOB_ID(), "1"),
-        opCode.name().toLowerCase(),
-        StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
-    if (!operatorIds.add(nextOpId)) {
-      throw new SamzaException(
-          String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
-    }
-    nextOpNum++;
-    return nextOpId;
-  }
-
-  /**
-   * Gets the unique ID for the next operator in the graph. The ID is of the following format:
-   * jobName-jobId-opCode-nextOpNum;
-   *
-   * @param opCode the {@link OpCode} of the next operator
-   * @return the unique ID for the next operator in the graph
-   */
-  public String getNextOpId(OpCode opCode) {
-    return getNextOpId(opCode, null);
-  }
-
-  /**
-   * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
-   * An intermediate {@link MessageStream} is both an output and an input stream.
-   *
-   * @param streamId the id of the stream to be created.
-   * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
-   *              is used.
-   * @param isBroadcast whether the stream is a broadcast stream.
-   * @param <M> the type of messages in the intermediate {@link MessageStream}
-   * @return  the intermediate {@link MessageStreamImpl}
-   */
-  @VisibleForTesting
-  public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) {
-    Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId),
-        "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
-
-    if (serde == null) {
-      LOGGER.info("No serde provided for intermediate stream: " + streamId +
-          ". Key and message serdes configured for the job.default.system will be used.");
-    }
-
-    if (isBroadcast) broadcastStreams.add(streamId);
-
-    boolean isKeyed;
-    KV<Serde, Serde> kvSerdes;
-    if (serde == null) { // if no explicit serde available
-      isKeyed = true; // assume keyed stream
-      kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs
-    } else {
-      isKeyed = serde instanceof KVSerde;
-      kvSerdes = getKVSerdes(streamId, serde);
-    }
-
-    InputTransformer transformer = (InputTransformer) defaultSystemDescriptorOptional
-        .flatMap(SystemDescriptor::getTransformer).orElse(null);
-
-    InputOperatorSpec inputOperatorSpec =
-        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
-            transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
-    inputOperators.put(streamId, inputOperatorSpec);
-    outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
-    return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
-  }
-
-  Map<String, InputOperatorSpec> getInputOperators() {
-    return Collections.unmodifiableMap(inputOperators);
-  }
-
-  Map<String, OutputStreamImpl> getOutputStreams() {
-    return Collections.unmodifiableMap(outputStreams);
-  }
-
-  Set<String> getBroadcastStreams() {
-    return Collections.unmodifiableSet(broadcastStreams);
-  }
-
-  Map<TableSpec, TableImpl> getTables() {
-    return Collections.unmodifiableMap(tables);
-  }
-
-  public Map<String, InputDescriptor> getInputDescriptors() {
-    return Collections.unmodifiableMap(inputDescriptors);
-  }
-
-  public Map<String, OutputDescriptor> getOutputDescriptors() {
-    return Collections.unmodifiableMap(outputDescriptors);
-  }
-
-  public Set<SystemDescriptor> getSystemDescriptors() {
-    // We enforce that users must not use different system descriptor instances for the same system name
-    // when getting an input/output stream or setting the default system descriptor
-    return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
-  }
-
-  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
-    return this.defaultSystemDescriptorOptional;
-  }
-
-  private void checkSystemDescriptorUniqueness(SystemDescriptor systemDescriptor, String systemName) {
-    Preconditions.checkState(!systemDescriptors.containsKey(systemName)
-            || systemDescriptors.get(systemName) == systemDescriptor,
-        "Must not use different system descriptor instances for the same system name: " + systemName);
-  }
-
-  private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
-    Serde keySerde, valueSerde;
-
-    if (serde instanceof KVSerde) {
-      keySerde = ((KVSerde) serde).getKeySerde();
-      valueSerde = ((KVSerde) serde).getValueSerde();
-    } else {
-      keySerde = new NoOpSerde();
-      valueSerde = serde;
-    }
-
-    if (keySerde instanceof NoOpSerde) {
-      LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
-          ". Keys will not be (de)serialized");
-    }
-    if (valueSerde instanceof NoOpSerde) {
-      LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
-          ". Values will not be (de)serialized");
-    }
-
-    return KV.of(keySerde, valueSerde);
-  }
-}