You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:48 UTC

[08/12] samza git commit: Consolidating package names for System, Stream, Application and Table descriptors.

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
index e9e2635..2b29a2b 100644
--- a/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
+++ b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.application;
 
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 import org.apache.samza.task.TaskFactoryUtil;
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
deleted file mode 100644
index 3bc3ed5..0000000
--- a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.TableImpl;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.StreamExpander;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OperatorSpecs;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.hybrid.BaseHybridTableDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class defines:
- * 1) an implementation of {@link StreamApplicationDescriptor} that provides APIs to access {@link MessageStream}, {@link OutputStream},
- * and {@link Table} to create the DAG of transforms.
- * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG
- */
-public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<StreamApplicationDescriptor>
-    implements StreamApplicationDescriptor {
-  private static final Logger LOGGER = LoggerFactory.getLogger(StreamApplicationDescriptorImpl.class);
-  private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
-
-  private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
-  private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
-  private final Set<String> broadcastStreams = new HashSet<>();
-  private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
-  private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
-  // We use a LHM for deterministic order in initializing and closing operators.
-  private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
-  private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
-  private final Map<String, TableImpl> tables = new LinkedHashMap<>();
-  private final Set<String> operatorIds = new HashSet<>();
-
-  private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
-
-  /**
-   * The 0-based position of the next operator in the graph.
-   * Part of the unique ID for each OperatorSpec in the graph.
-   * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
-   */
-  private int nextOpNum = 0;
-
-  public StreamApplicationDescriptorImpl(StreamApplication userApp, Config config) {
-    super(userApp, config);
-    userApp.describe(this);
-  }
-
-  @Override
-  public StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
-    Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
-    Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
-        "Default system must be set before creating any input or output streams.");
-    addSystemDescriptor(defaultSystemDescriptor);
-
-    defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
-    return this;
-  }
-
-  @Override
-  public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
-    SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
-    Optional<StreamExpander> expander = systemDescriptor.getExpander();
-    if (expander.isPresent()) {
-      return expander.get().apply(this, inputDescriptor);
-    }
-
-    // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
-    Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
-        String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
-    inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
-    addSystemDescriptor(inputDescriptor.getSystemDescriptor());
-
-    String streamId = inputDescriptor.getStreamId();
-    Preconditions.checkState(!inputOperators.containsKey(streamId),
-        "getInputStream must not be called multiple times with the same streamId: " + streamId);
-
-    Serde serde = inputDescriptor.getSerde();
-    KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
-    if (outputStreams.containsKey(streamId)) {
-      OutputStreamImpl outputStream = outputStreams.get(streamId);
-      Serde keySerde = outputStream.getKeySerde();
-      Serde valueSerde = outputStream.getValueSerde();
-      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
-          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
-              + "stream level, so the same key and message Serde must be used for both.", streamId));
-    }
-
-    boolean isKeyed = serde instanceof KVSerde;
-    InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
-    InputOperatorSpec inputOperatorSpec =
-        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
-            transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
-    inputOperators.put(streamId, inputOperatorSpec);
-    return new MessageStreamImpl(this, inputOperators.get(streamId));
-  }
-
-  @Override
-  public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
-    Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
-        String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
-    outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
-    addSystemDescriptor(outputDescriptor.getSystemDescriptor());
-
-    String streamId = outputDescriptor.getStreamId();
-    Preconditions.checkState(!outputStreams.containsKey(streamId),
-        "getOutputStream must not be called multiple times with the same streamId: " + streamId);
-
-    Serde serde = outputDescriptor.getSerde();
-    KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
-    if (inputOperators.containsKey(streamId)) {
-      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
-      Serde keySerde = inputOperatorSpec.getKeySerde();
-      Serde valueSerde = inputOperatorSpec.getValueSerde();
-      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
-          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
-              + "stream level, so the same key and message Serde must be used for both.", streamId));
-    }
-
-    boolean isKeyed = serde instanceof KVSerde;
-    outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
-    return outputStreams.get(streamId);
-  }
-
-  @Override
-  public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
-
-    if (tableDescriptor instanceof BaseHybridTableDescriptor) {
-      List<? extends TableDescriptor<K, V, ?>> tableDescs = ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
-      tableDescs.forEach(td -> getTable(td));
-    }
-
-    String tableId = tableDescriptor.getTableId();
-    Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
-        String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
-    Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
-        String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
-    tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
-
-    BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
-    TableSpec tableSpec = baseTableDescriptor.getTableSpec();
-    if (tables.containsKey(tableSpec.getId())) {
-      throw new IllegalStateException(
-          String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
-    }
-    tables.put(tableSpec.getId(), new TableImpl(tableSpec));
-    getOrCreateTableSerdes(tableSpec.getId(), baseTableDescriptor.getSerde());
-    return tables.get(tableSpec.getId());
-  }
-
-  /**
-   * Get all the {@link InputDescriptor}s to this application
-   *
-   * @return an immutable map of streamId to {@link InputDescriptor}
-   */
-  @Override
-  public Map<String, InputDescriptor> getInputDescriptors() {
-    return Collections.unmodifiableMap(inputDescriptors);
-  }
-
-  /**
-   * Get all the {@link OutputDescriptor}s from this application
-   *
-   * @return an immutable map of streamId to {@link OutputDescriptor}
-   */
-  @Override
-  public Map<String, OutputDescriptor> getOutputDescriptors() {
-    return Collections.unmodifiableMap(outputDescriptors);
-  }
-
-  /**
-   * Get all the broadcast streamIds from this application
-   *
-   * @return an immutable set of streamIds
-   */
-  @Override
-  public Set<String> getBroadcastStreams() {
-    return Collections.unmodifiableSet(broadcastStreams);
-  }
-
-  /**
-   * Get all the {@link TableDescriptor}s in this application
-   *
-   * @return an immutable set of {@link TableDescriptor}s
-   */
-  @Override
-  public Set<TableDescriptor> getTableDescriptors() {
-    return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
-  }
-
-  /**
-   * Get all the unique {@link SystemDescriptor}s in this application
-   *
-   * @return an immutable set of {@link SystemDescriptor}s
-   */
-  @Override
-  public Set<SystemDescriptor> getSystemDescriptors() {
-    // We enforce that users must not use different system descriptor instances for the same system name
-    // when getting an input/output stream or setting the default system descriptor
-    return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
-  }
-
-  @Override
-  public Set<String> getInputStreamIds() {
-    return Collections.unmodifiableSet(new HashSet<>(inputOperators.keySet()));
-  }
-
-  @Override
-  public Set<String> getOutputStreamIds() {
-    return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet()));
-  }
-
-  /**
-   * Get the default {@link SystemDescriptor} in this application
-   *
-   * @return the default {@link SystemDescriptor}
-   */
-  @Override
-  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
-    return defaultSystemDescriptorOptional;
-  }
-
-  public OperatorSpecGraph getOperatorSpecGraph() {
-    return new OperatorSpecGraph(this);
-  }
-
-  /**
-   * Gets the unique ID for the next operator in the graph. The ID is of the following format:
-   * jobName-jobId-opCode-(userDefinedId|nextOpNum);
-   *
-   * @param opCode the {@link OpCode} of the next operator
-   * @param userDefinedId the optional user-provided name of the next operator or null
-   * @return the unique ID for the next operator in the graph
-   */
-  public String getNextOpId(OpCode opCode, String userDefinedId) {
-    if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) {
-      throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
-    }
-
-    String nextOpId = String.format("%s-%s-%s-%s",
-        config.get(JobConfig.JOB_NAME()),
-        config.get(JobConfig.JOB_ID(), "1"),
-        opCode.name().toLowerCase(),
-        StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
-    if (!operatorIds.add(nextOpId)) {
-      throw new SamzaException(
-          String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
-    }
-    nextOpNum++;
-    return nextOpId;
-  }
-
-  /**
-   * Gets the unique ID for the next operator in the graph. The ID is of the following format:
-   * jobName-jobId-opCode-nextOpNum;
-   *
-   * @param opCode the {@link OpCode} of the next operator
-   * @return the unique ID for the next operator in the graph
-   */
-  public String getNextOpId(OpCode opCode) {
-    return getNextOpId(opCode, null);
-  }
-
-  public Map<String, InputOperatorSpec> getInputOperators() {
-    return Collections.unmodifiableMap(inputOperators);
-  }
-
-  public Map<String, OutputStreamImpl> getOutputStreams() {
-    return Collections.unmodifiableMap(outputStreams);
-  }
-
-  public Map<String, TableImpl> getTables() {
-    return Collections.unmodifiableMap(tables);
-  }
-
-  /**
-   * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
-   * An intermediate {@link MessageStream} is both an output and an input stream.
-   *
-   * @param streamId the id of the stream to be created.
-   * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
-   *              is used.
-   * @param isBroadcast whether the stream is a broadcast stream.
-   * @param <M> the type of messages in the intermediate {@link MessageStream}
-   * @return  the intermediate {@link MessageStreamImpl}
-   */
-  @VisibleForTesting
-  public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) {
-    Preconditions.checkNotNull(serde, "serde must not be null for intermediate stream: " + streamId);
-    Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId),
-        "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
-
-    if (isBroadcast) {
-      broadcastStreams.add(streamId);
-    }
-
-    boolean isKeyed = serde instanceof KVSerde;
-    KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
-
-    InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor()
-        .flatMap(SystemDescriptor::getTransformer).orElse(null);
-
-    InputOperatorSpec inputOperatorSpec =
-        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
-            transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
-    inputOperators.put(streamId, inputOperatorSpec);
-    outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
-    return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
-  }
-
-  // check uniqueness of the {@code systemDescriptor} and add if it is unique
-  private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
-    Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
-            || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
-        "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
-    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
deleted file mode 100644
index b4fde1e..0000000
--- a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import com.google.common.base.Preconditions;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.task.TaskFactory;
-
-
-/**
- * This class implements interface {@link TaskApplicationDescriptor}.
- * <p>
- * In addition to the common objects for an application defined in {@link ApplicationDescriptorImpl}, this class also includes
- * the low-level {@link TaskFactory} that creates user-defined task instances, the lists of input/broadcast/output streams,
- * and the list of {@link TableDescriptor}s used in the application.
- */
-public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<TaskApplicationDescriptor>
-    implements TaskApplicationDescriptor {
-
-  private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
-  private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
-  private final Set<String> broadcastStreams = new HashSet<>();
-  private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
-  private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
-
-  private TaskFactory taskFactory = null;
-
-  public TaskApplicationDescriptorImpl(TaskApplication userApp, Config config) {
-    super(userApp, config);
-    userApp.describe(this);
-  }
-
-  @Override
-  public void setTaskFactory(TaskFactory factory) {
-    this.taskFactory = factory;
-  }
-
-  @Override
-  public void addInputStream(InputDescriptor inputDescriptor) {
-    // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
-    Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
-        String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
-    getOrCreateStreamSerdes(inputDescriptor.getStreamId(), inputDescriptor.getSerde());
-    inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
-    addSystemDescriptor(inputDescriptor.getSystemDescriptor());
-  }
-
-  @Override
-  public void addOutputStream(OutputDescriptor outputDescriptor) {
-    Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
-        String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
-    getOrCreateStreamSerdes(outputDescriptor.getStreamId(), outputDescriptor.getSerde());
-    outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
-    addSystemDescriptor(outputDescriptor.getSystemDescriptor());
-  }
-
-  @Override
-  public void addTable(TableDescriptor tableDescriptor) {
-    Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
-        String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
-    getOrCreateTableSerdes(tableDescriptor.getTableId(), ((BaseTableDescriptor) tableDescriptor).getSerde());
-    tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
-  }
-
-  @Override
-  public Map<String, InputDescriptor> getInputDescriptors() {
-    return Collections.unmodifiableMap(inputDescriptors);
-  }
-
-  @Override
-  public Map<String, OutputDescriptor> getOutputDescriptors() {
-    return Collections.unmodifiableMap(outputDescriptors);
-  }
-
-  @Override
-  public Set<String> getBroadcastStreams() {
-    return Collections.unmodifiableSet(broadcastStreams);
-  }
-
-  @Override
-  public Set<TableDescriptor> getTableDescriptors() {
-    return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
-  }
-
-  @Override
-  public Set<SystemDescriptor> getSystemDescriptors() {
-    // We enforce that users must not use different system descriptor instances for the same system name
-    // when getting an input/output stream or setting the default system descriptor
-    return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
-  }
-
-  @Override
-  public Set<String> getInputStreamIds() {
-    return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
-  }
-
-  @Override
-  public Set<String> getOutputStreamIds() {
-    return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
-  }
-
-  /**
-   * Get the user-defined {@link TaskFactory}
-   * @return the {@link TaskFactory} object
-   */
-  public TaskFactory getTaskFactory() {
-    return taskFactory;
-  }
-
-  // check uniqueness of the {@code systemDescriptor} and add if it is unique
-  private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
-    Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
-            || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
-        "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
-    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
new file mode 100644
index 0000000..f3c34a9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ApplicationContainerContext;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContext;
+import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.metrics.MetricsReporterFactory;
+import org.apache.samza.operators.KV;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.runtime.ProcessorLifecycleListener;
+import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is the base class that implements interface {@link ApplicationDescriptor}.
+ * <p>
+ * This base class contains the common objects that are used by both high-level and low-level API applications, such as
+ * {@link Config}, {@link ApplicationContainerContextFactory}, {@link ApplicationTaskContextFactory}, and
+ * {@link ProcessorLifecycleListenerFactory}.
+ *
+ * @param <S> the type of {@link ApplicationDescriptor} interface this implements. It has to be either
+ *            {@link StreamApplicationDescriptor} or {@link TaskApplicationDescriptor}
+ */
+public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
+    implements ApplicationDescriptor<S> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationDescriptorImpl.class);
+
+  private final Class<? extends SamzaApplication> appClass;
+  private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>();
+  // serdes used by input/output/intermediate streams, keyed by streamId
+  private final Map<String, KV<Serde, Serde>> streamSerdes = new HashMap<>();
+  // serdes used by tables, keyed by tableId
+  private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>();
+  final Config config;
+
+  private Optional<ApplicationContainerContextFactory<?>> applicationContainerContextFactoryOptional = Optional.empty();
+  private Optional<ApplicationTaskContextFactory<?>> applicationTaskContextFactoryOptional = Optional.empty();
+
+  // Default to no-op  ProcessorLifecycleListenerFactory
+  ProcessorLifecycleListenerFactory listenerFactory = (pcontext, cfg) -> new ProcessorLifecycleListener() { };
+
+  ApplicationDescriptorImpl(SamzaApplication app, Config config) {
+    this.config = config;
+    this.appClass = app.getClass();
+  }
+
+  @Override
+  public Config getConfig() {
+    return config;
+  }
+
+  @Override
+  public S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) {
+    this.applicationContainerContextFactoryOptional = Optional.of(factory);
+    return (S) this;
+  }
+
+  @Override
+  public S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) {
+    this.applicationTaskContextFactoryOptional = Optional.of(factory);
+    return (S) this;
+  }
+
+  @Override
+  public S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) {
+    this.listenerFactory = listenerFactory;
+    return (S) this;
+  }
+
+  @Override
+  public S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) {
+    this.reporterFactories.clear();
+    this.reporterFactories.putAll(reporterFactories);
+    return (S) this;
+  }
+
+  /**
+   * Get the application class
+   *
+   * @return an implementation of {@link SamzaApplication}
+   */
+  public Class<? extends SamzaApplication> getAppClass() {
+    return appClass;
+  }
+
+  /**
+   * Get the {@link ApplicationContainerContextFactory} specified by the application.
+   *
+   * @return {@link ApplicationContainerContextFactory} if application specified it; empty otherwise
+   */
+  public Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> getApplicationContainerContextFactory() {
+    @SuppressWarnings("unchecked") // ok because all context types are at least ApplicationContainerContext
+    Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> factoryOptional =
+        (Optional) this.applicationContainerContextFactoryOptional;
+    return factoryOptional;
+  }
+
+  /**
+   * Get the {@link ApplicationTaskContextFactory} specified by the application.
+   *
+   * @return {@link ApplicationTaskContextFactory} if application specified it; empty otherwise
+   */
+  public Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> getApplicationTaskContextFactory() {
+    @SuppressWarnings("unchecked") // ok because all context types are at least ApplicationTaskContext
+    Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> factoryOptional =
+        (Optional) this.applicationTaskContextFactoryOptional;
+    return factoryOptional;
+  }
+
+  /**
+   * Get the {@link ProcessorLifecycleListenerFactory} associated with this application
+   *
+   * @return the {@link ProcessorLifecycleListenerFactory} in this application
+   */
+  public ProcessorLifecycleListenerFactory getProcessorLifecycleListenerFactory() {
+    return listenerFactory;
+  }
+
+  /**
+   * Get the {@link MetricsReporterFactory}s used in the application
+   *
+   * @return the map of {@link MetricsReporterFactory}s
+   */
+  public Map<String, MetricsReporterFactory> getMetricsReporterFactories() {
+    return Collections.unmodifiableMap(reporterFactories);
+  }
+
+  /**
+   * Get the default {@link SystemDescriptor} in this application
+   *
+   * @return the default {@link SystemDescriptor}
+   */
+  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
+    // default is not set
+    return Optional.empty();
+  }
+
+  /**
+   * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+   *
+   * @param streamId id of the stream
+   * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+   */
+  public KV<Serde, Serde> getStreamSerdes(String streamId) {
+    return streamSerdes.get(streamId);
+  }
+
+  /**
+   * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+   *
+   * @param tableId id of the table
+   * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+   */
+  public KV<Serde, Serde> getTableSerdes(String tableId) {
+    return tableSerdes.get(tableId);
+  }
+
+  /**
+   * Get the map of all {@link InputOperatorSpec}s in this applicaiton
+   *
+   * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level
+   * {@link org.apache.samza.application.TaskApplication}
+   */
+  public Map<String, InputOperatorSpec> getInputOperators() {
+    return Collections.EMPTY_MAP;
+  }
+
+  /**
+   * Get all the {@link InputDescriptor}s to this application
+   *
+   * @return an immutable map of streamId to {@link InputDescriptor}
+   */
+  public abstract Map<String, InputDescriptor> getInputDescriptors();
+
+  /**
+   * Get all the {@link OutputDescriptor}s from this application
+   *
+   * @return an immutable map of streamId to {@link OutputDescriptor}
+   */
+  public abstract Map<String, OutputDescriptor> getOutputDescriptors();
+
+  /**
+   * Get all the broadcast streamIds from this application
+   *
+   * @return an immutable set of streamIds
+   */
+  public abstract Set<String> getBroadcastStreams();
+
+  /**
+   * Get all the {@link TableDescriptor}s in this application
+   *
+   * @return an immutable set of {@link TableDescriptor}s
+   */
+  public abstract Set<TableDescriptor> getTableDescriptors();
+
+  /**
+   * Get all the unique {@link SystemDescriptor}s in this application
+   *
+   * @return an immutable set of {@link SystemDescriptor}s
+   */
+  public abstract Set<SystemDescriptor> getSystemDescriptors();
+
+  /**
+   * Get all the unique input streamIds in this application
+   *
+   * @return an immutable set of input streamIds
+   */
+  public abstract Set<String> getInputStreamIds();
+
+  /**
+   * Get all the unique output streamIds in this application
+   *
+   * @return an immutable set of output streamIds
+   */
+  public abstract Set<String> getOutputStreamIds();
+
+  KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) {
+    Serde keySerde, valueSerde;
+
+    KV<Serde, Serde> currentSerdePair = streamSerdes.get(streamId);
+
+    if (serde instanceof KVSerde) {
+      keySerde = ((KVSerde) serde).getKeySerde();
+      valueSerde = ((KVSerde) serde).getValueSerde();
+    } else {
+      keySerde = new NoOpSerde();
+      valueSerde = serde;
+    }
+
+    if (currentSerdePair == null) {
+      if (keySerde instanceof NoOpSerde) {
+        LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
+            ". Keys will not be (de)serialized");
+      }
+      if (valueSerde instanceof NoOpSerde) {
+        LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
+            ". Values will not be (de)serialized");
+      }
+      streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
+    } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
+      throw new IllegalArgumentException(String.format("Serde for stream %s is already defined. Cannot change it to "
+          + "different serdes.", streamId));
+    }
+    return streamSerdes.get(streamId);
+  }
+
+  KV<Serde, Serde> getOrCreateTableSerdes(String tableId, KVSerde kvSerde) {
+    Serde keySerde, valueSerde;
+    keySerde = kvSerde.getKeySerde();
+    valueSerde = kvSerde.getValueSerde();
+
+    if (!tableSerdes.containsKey(tableId)) {
+      tableSerdes.put(tableId, KV.of(keySerde, valueSerde));
+      return tableSerdes.get(tableId);
+    }
+
+    KV<Serde, Serde> currentSerdePair = tableSerdes.get(tableId);
+    if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
+      throw new IllegalArgumentException(String.format("Serde for table %s is already defined. Cannot change it to "
+          + "different serdes.", tableId));
+    }
+    return streamSerdes.get(tableId);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorUtil.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorUtil.java
new file mode 100644
index 0000000..e3c2d5c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.config.Config;
+
+
+/**
+ * Util class to help creating {@link ApplicationDescriptorImpl} instance from {@link SamzaApplication} and {@link Config}
+ */
+public class ApplicationDescriptorUtil {
+
+  private ApplicationDescriptorUtil() {
+
+  }
+
+  /**
+   * Create a new instance of {@link ApplicationDescriptorImpl} based on {@link SamzaApplication} and {@link Config}
+   *
+   * @param app an implementation of {@link SamzaApplication}. The {@code app} has to have a proper fully-qualified class name.
+   * @param config the {@link Config} for the application
+   * @return the {@link ApplicationDescriptorImpl} instance containing the processing logic and the config
+   */
+  public static ApplicationDescriptorImpl<? extends ApplicationDescriptor> getAppDescriptor(SamzaApplication app, Config config) {
+    if (app instanceof StreamApplication) {
+      return new StreamApplicationDescriptorImpl((StreamApplication) app, config);
+    }
+    if (app instanceof TaskApplication) {
+      return new TaskApplicationDescriptorImpl((TaskApplication) app, config);
+    }
+    throw new IllegalArgumentException(String.format("User application class %s is not supported. Only StreamApplication "
+        + "and TaskApplication are supported.", app.getClass().getName()));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
new file mode 100644
index 0000000..ec36cf3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.operators.TableImpl;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.system.descriptors.InputTransformer;
+import org.apache.samza.system.descriptors.StreamExpander;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.BaseHybridTableDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class defines:
+ * 1) an implementation of {@link StreamApplicationDescriptor} that provides APIs to access {@link MessageStream}, {@link OutputStream},
+ * and {@link Table} to create the DAG of transforms.
+ * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG
+ */
+public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<StreamApplicationDescriptor>
+    implements StreamApplicationDescriptor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(StreamApplicationDescriptorImpl.class);
+  private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
+
+  private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
+  private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
+  private final Set<String> broadcastStreams = new HashSet<>();
+  private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
+  private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
+  // We use a LHM for deterministic order in initializing and closing operators.
+  private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
+  private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
+  private final Map<String, TableImpl> tables = new LinkedHashMap<>();
+  private final Set<String> operatorIds = new HashSet<>();
+
+  private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
+
+  /**
+   * The 0-based position of the next operator in the graph.
+   * Part of the unique ID for each OperatorSpec in the graph.
+   * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
+   */
+  private int nextOpNum = 0;
+
+  public StreamApplicationDescriptorImpl(StreamApplication userApp, Config config) {
+    super(userApp, config);
+    userApp.describe(this);
+  }
+
+  @Override
+  public StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
+    Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
+    Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
+        "Default system must be set before creating any input or output streams.");
+    addSystemDescriptor(defaultSystemDescriptor);
+
+    defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
+    return this;
+  }
+
+  @Override
+  public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
+    SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
+    Optional<StreamExpander> expander = systemDescriptor.getExpander();
+    if (expander.isPresent()) {
+      return expander.get().apply(this, inputDescriptor);
+    }
+
+    // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
+    Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
+        String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+    inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
+    addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+
+    String streamId = inputDescriptor.getStreamId();
+    Preconditions.checkState(!inputOperators.containsKey(streamId),
+        "getInputStream must not be called multiple times with the same streamId: " + streamId);
+
+    Serde serde = inputDescriptor.getSerde();
+    KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
+    if (outputStreams.containsKey(streamId)) {
+      OutputStreamImpl outputStream = outputStreams.get(streamId);
+      Serde keySerde = outputStream.getKeySerde();
+      Serde valueSerde = outputStream.getValueSerde();
+      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+              + "stream level, so the same key and message Serde must be used for both.", streamId));
+    }
+
+    boolean isKeyed = serde instanceof KVSerde;
+    InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
+    InputOperatorSpec inputOperatorSpec =
+        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
+            transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
+    inputOperators.put(streamId, inputOperatorSpec);
+    return new MessageStreamImpl(this, inputOperators.get(streamId));
+  }
+
+  @Override
+  public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
+    Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
+        String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+    outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
+    addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+
+    String streamId = outputDescriptor.getStreamId();
+    Preconditions.checkState(!outputStreams.containsKey(streamId),
+        "getOutputStream must not be called multiple times with the same streamId: " + streamId);
+
+    Serde serde = outputDescriptor.getSerde();
+    KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
+    if (inputOperators.containsKey(streamId)) {
+      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
+      Serde keySerde = inputOperatorSpec.getKeySerde();
+      Serde valueSerde = inputOperatorSpec.getValueSerde();
+      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+              + "stream level, so the same key and message Serde must be used for both.", streamId));
+    }
+
+    boolean isKeyed = serde instanceof KVSerde;
+    outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    return outputStreams.get(streamId);
+  }
+
+  @Override
+  public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
+
+    if (tableDescriptor instanceof BaseHybridTableDescriptor) {
+      List<? extends TableDescriptor<K, V, ?>> tableDescs = ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
+      tableDescs.forEach(td -> getTable(td));
+    }
+
+    String tableId = tableDescriptor.getTableId();
+    Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
+        String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
+    Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
+        String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
+    tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
+
+    BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
+    TableSpec tableSpec = baseTableDescriptor.getTableSpec();
+    if (tables.containsKey(tableSpec.getId())) {
+      throw new IllegalStateException(
+          String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
+    }
+    tables.put(tableSpec.getId(), new TableImpl(tableSpec));
+    getOrCreateTableSerdes(tableSpec.getId(), baseTableDescriptor.getSerde());
+    return tables.get(tableSpec.getId());
+  }
+
+  /**
+   * Get all the {@link InputDescriptor}s to this application
+   *
+   * @return an immutable map of streamId to {@link InputDescriptor}
+   */
+  @Override
+  public Map<String, InputDescriptor> getInputDescriptors() {
+    return Collections.unmodifiableMap(inputDescriptors);
+  }
+
+  /**
+   * Get all the {@link OutputDescriptor}s from this application
+   *
+   * @return an immutable map of streamId to {@link OutputDescriptor}
+   */
+  @Override
+  public Map<String, OutputDescriptor> getOutputDescriptors() {
+    return Collections.unmodifiableMap(outputDescriptors);
+  }
+
+  /**
+   * Get all the broadcast streamIds from this application
+   *
+   * @return an immutable set of streamIds
+   */
+  @Override
+  public Set<String> getBroadcastStreams() {
+    return Collections.unmodifiableSet(broadcastStreams);
+  }
+
+  /**
+   * Get all the {@link TableDescriptor}s in this application
+   *
+   * @return an immutable set of {@link TableDescriptor}s
+   */
+  @Override
+  public Set<TableDescriptor> getTableDescriptors() {
+    return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
+  }
+
+  /**
+   * Get all the unique {@link SystemDescriptor}s in this application
+   *
+   * @return an immutable set of {@link SystemDescriptor}s
+   */
+  @Override
+  public Set<SystemDescriptor> getSystemDescriptors() {
+    // We enforce that users must not use different system descriptor instances for the same system name
+    // when getting an input/output stream or setting the default system descriptor
+    return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+  }
+
+  @Override
+  public Set<String> getInputStreamIds() {
+    return Collections.unmodifiableSet(new HashSet<>(inputOperators.keySet()));
+  }
+
+  @Override
+  public Set<String> getOutputStreamIds() {
+    return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet()));
+  }
+
+  /**
+   * Get the default {@link SystemDescriptor} in this application
+   *
+   * @return the default {@link SystemDescriptor}
+   */
+  @Override
+  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
+    return defaultSystemDescriptorOptional;
+  }
+
+  public OperatorSpecGraph getOperatorSpecGraph() {
+    return new OperatorSpecGraph(this);
+  }
+
+  /**
+   * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+   * jobName-jobId-opCode-(userDefinedId|nextOpNum);
+   *
+   * @param opCode the {@link OpCode} of the next operator
+   * @param userDefinedId the optional user-provided name of the next operator or null
+   * @return the unique ID for the next operator in the graph
+   */
+  public String getNextOpId(OpCode opCode, String userDefinedId) {
+    if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) {
+      throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
+    }
+
+    String nextOpId = String.format("%s-%s-%s-%s",
+        config.get(JobConfig.JOB_NAME()),
+        config.get(JobConfig.JOB_ID(), "1"),
+        opCode.name().toLowerCase(),
+        StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
+    if (!operatorIds.add(nextOpId)) {
+      throw new SamzaException(
+          String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
+    }
+    nextOpNum++;
+    return nextOpId;
+  }
+
+  /**
+   * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+   * jobName-jobId-opCode-nextOpNum;
+   *
+   * @param opCode the {@link OpCode} of the next operator
+   * @return the unique ID for the next operator in the graph
+   */
+  public String getNextOpId(OpCode opCode) {
+    return getNextOpId(opCode, null);
+  }
+
+  public Map<String, InputOperatorSpec> getInputOperators() {
+    return Collections.unmodifiableMap(inputOperators);
+  }
+
+  public Map<String, OutputStreamImpl> getOutputStreams() {
+    return Collections.unmodifiableMap(outputStreams);
+  }
+
+  public Map<String, TableImpl> getTables() {
+    return Collections.unmodifiableMap(tables);
+  }
+
+  /**
+   * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
+   * An intermediate {@link MessageStream} is both an output and an input stream.
+   *
+   * @param streamId the id of the stream to be created.
+   * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
+   *              is used.
+   * @param isBroadcast whether the stream is a broadcast stream.
+   * @param <M> the type of messages in the intermediate {@link MessageStream}
+   * @return  the intermediate {@link MessageStreamImpl}
+   */
+  @VisibleForTesting
+  public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) {
+    Preconditions.checkNotNull(serde, "serde must not be null for intermediate stream: " + streamId);
+    Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId),
+        "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
+
+    if (isBroadcast) {
+      broadcastStreams.add(streamId);
+    }
+
+    boolean isKeyed = serde instanceof KVSerde;
+    KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
+
+    InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor()
+        .flatMap(SystemDescriptor::getTransformer).orElse(null);
+
+    InputOperatorSpec inputOperatorSpec =
+        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
+            transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
+    inputOperators.put(streamId, inputOperatorSpec);
+    outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
+  }
+
+  // check uniqueness of the {@code systemDescriptor} and add if it is unique
+  private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
+    Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
+            || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
+        "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
+    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
new file mode 100644
index 0000000..cb924ab
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.task.TaskFactory;
+
+
+/**
+ * This class implements interface {@link TaskApplicationDescriptor}.
+ * <p>
+ * In addition to the common objects for an application defined in {@link ApplicationDescriptorImpl}, this class also includes
+ * the low-level {@link TaskFactory} that creates user-defined task instances, the lists of input/broadcast/output streams,
+ * and the list of {@link TableDescriptor}s used in the application.
+ */
+public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<TaskApplicationDescriptor>
+    implements TaskApplicationDescriptor {
+
+  private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
+  private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
+  private final Set<String> broadcastStreams = new HashSet<>();
+  private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
+  private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
+
+  private TaskFactory taskFactory = null;
+
+  public TaskApplicationDescriptorImpl(TaskApplication userApp, Config config) {
+    super(userApp, config);
+    userApp.describe(this);
+  }
+
+  @Override
+  public void setTaskFactory(TaskFactory factory) {
+    this.taskFactory = factory;
+  }
+
+  @Override
+  public void addInputStream(InputDescriptor inputDescriptor) {
+    // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
+    Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
+        String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+    getOrCreateStreamSerdes(inputDescriptor.getStreamId(), inputDescriptor.getSerde());
+    inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
+    addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+  }
+
+  @Override
+  public void addOutputStream(OutputDescriptor outputDescriptor) {
+    Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
+        String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+    getOrCreateStreamSerdes(outputDescriptor.getStreamId(), outputDescriptor.getSerde());
+    outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
+    addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+  }
+
+  @Override
+  public void addTable(TableDescriptor tableDescriptor) {
+    Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
+        String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
+    getOrCreateTableSerdes(tableDescriptor.getTableId(), ((BaseTableDescriptor) tableDescriptor).getSerde());
+    tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
+  }
+
+  @Override
+  public Map<String, InputDescriptor> getInputDescriptors() {
+    return Collections.unmodifiableMap(inputDescriptors);
+  }
+
+  @Override
+  public Map<String, OutputDescriptor> getOutputDescriptors() {
+    return Collections.unmodifiableMap(outputDescriptors);
+  }
+
+  @Override
+  public Set<String> getBroadcastStreams() {
+    return Collections.unmodifiableSet(broadcastStreams);
+  }
+
+  @Override
+  public Set<TableDescriptor> getTableDescriptors() {
+    return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
+  }
+
+  @Override
+  public Set<SystemDescriptor> getSystemDescriptors() {
+    // We enforce that users must not use different system descriptor instances for the same system name
+    // when getting an input/output stream or setting the default system descriptor
+    return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+  }
+
+  @Override
+  public Set<String> getInputStreamIds() {
+    return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
+  }
+
+  @Override
+  public Set<String> getOutputStreamIds() {
+    return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
+  }
+
+  /**
+   * Get the user-defined {@link TaskFactory}
+   * @return the {@link TaskFactory} object
+   */
+  public TaskFactory getTaskFactory() {
+    return taskFactory;
+  }
+
+  // check uniqueness of the {@code systemDescriptor} and add if it is unique
+  private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
+    Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
+            || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
+        "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
+    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
index ed013c4..06f5606 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -59,9 +59,9 @@ public class JavaTableConfig extends MapConfig {
   }
 
   /**
-   * Get the {@link org.apache.samza.table.TableProviderFactory} class for a table
+   * Get the {@link org.apache.samza.table.descriptors.TableProviderFactory} class for a table
    * @param tableId Id of the table
-   * @return the {@link org.apache.samza.table.TableProviderFactory} class name
+   * @return the {@link org.apache.samza.table.descriptors.TableProviderFactory} class name
    */
   public String getTableProviderFactory(String tableId) {
     return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null);

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index b80f7df..10a4215 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -35,15 +35,15 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index f43b24e..6d9faf3 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -30,8 +30,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 18705e4..4a2a235 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -200,8 +200,8 @@ import org.codehaus.jackson.map.ObjectMapper;
   }
 
   /**
-   * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.application.ApplicationDescriptorImpl}
-   * for this job
+   * Create JSON POJO for a {@link JobNode}, including the
+   * {@link org.apache.samza.application.descriptors.ApplicationDescriptorImpl} for this job
    *
    * @param jobNode job node in the {@link JobGraph}
    * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index af556f5..82b4178 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -26,8 +26,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index 83f3f61..dc0fc59 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -26,8 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index 86aca0f..6ca5f3d 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -23,8 +23,8 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
index 54f86d5..13b29df 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
@@ -21,8 +21,8 @@ package org.apache.samza.execution;
 import java.util.List;
 import java.util.UUID;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
deleted file mode 100644
index dd47af2..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.table.TableSpec;
-
-
-/**
- * Base class for all table descriptor implementations.
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- * @param <D> the type of the concrete table descriptor
- */
-abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
-    implements TableDescriptor<K, V, D> {
-
-  protected final String tableId;
-
-  protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde());
-
-  protected final Map<String, String> config = new HashMap<>();
-
-  /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   */
-  protected BaseTableDescriptor(String tableId) {
-    this.tableId = tableId;
-  }
-
-  /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   * @param serde the serde for key and value
-   */
-  protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
-    this.tableId = tableId;
-    this.serde = serde;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public D withConfig(String key, String value) {
-    config.put(key, value);
-    return (D) this;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public String getTableId() {
-    return tableId;
-  }
-
-  /**
-   * Get the serde assigned to this {@link TableDescriptor}
-   *
-   * @return {@link KVSerde} used by this table
-   */
-  public KVSerde<K, V> getSerde() {
-    return serde;
-  }
-
-  /**
-   * Generate config for {@link TableSpec}; this method is used internally.
-   * @param tableSpecConfig configuration for the {@link TableSpec}
-   */
-  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
-    tableSpecConfig.putAll(config);
-  }
-
-  /**
-   * Validate that this table descriptor is constructed properly; this method is used internally.
-   */
-  protected void validate() {
-  }
-
-  /**
-   * Create a {@link TableSpec} from this table descriptor; this method is used internally.
-   *
-   * @return the {@link TableSpec}
-   */
-  abstract public TableSpec getTableSpec();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 09e4868..0f43c5e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -24,7 +24,7 @@ import java.time.Duration;
 import java.util.Collection;
 
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
index 5329fd7..a83739d 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
@@ -25,7 +25,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
deleted file mode 100644
index 6c4ae49..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors;
-
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * A descriptor for samza framework internal usage.
- * <p>
- * Allows creating a {@link SystemDescriptor} without setting the factory class name, and delegating
- * rest of the system customization to configurations.
- * <p>
- * Useful for code-generation and testing use cases where the factory name is not known in advance.
- */
-@SuppressWarnings("unchecked")
-public final class DelegatingSystemDescriptor extends SystemDescriptor<DelegatingSystemDescriptor>
-    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
-
-  /**
-   * Constructs an {@link DelegatingSystemDescriptor} instance with no system level serde.
-   * Serdes must be provided explicitly at stream level when getting input or output descriptors.
-   * SystemFactory class name must be provided in configuration.
-   *
-   * @param systemName name of this system
-   */
-  @VisibleForTesting
-  public DelegatingSystemDescriptor(String systemName) {
-    super(systemName, null, null, null);
-  }
-
-  @Override
-  public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor(
-      String streamId, Serde<StreamMessageType> serde) {
-    return new GenericInputDescriptor<>(streamId, this, serde);
-  }
-
-  @Override
-  public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor(
-      String streamId, Serde<StreamMessageType> serde) {
-    return new GenericOutputDescriptor<>(streamId, this, serde);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
index 2a73064..fbeda3e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
@@ -20,7 +20,7 @@ package org.apache.samza.operators.impl;
 
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.system.descriptors.InputTransformer;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.system.IncomingMessageEnvelope;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 1af4806..b467d60 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.system.descriptors.InputTransformer;
 import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.Serde;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 0442f7c..1886d1b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -103,7 +103,8 @@ public abstract class OperatorSpec<M, OM> implements Serializable {
   }
 
   /**
-   * Get the unique ID of this operator in the {@link org.apache.samza.application.StreamApplicationDescriptorImpl}.
+   * Get the unique ID of this operator in the
+   * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl}.
    * @return  the unique operator ID
    */
   public final String getOpId() {

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 6ebbdae..8d3ff60 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -22,7 +22,7 @@ package org.apache.samza.operators.spec;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.system.descriptors.InputTransformer;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
index 4db8e60..44f62d9 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.operators.stream;
 
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.spec.InputOperatorSpec;