You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:52 UTC

[12/12] samza git commit: Consolidating package names for System, Stream, Application and Table descriptors.

Consolidating package names for System, Stream, Application and Table descriptors.

Everything in this PR is either:
1. A package name change and a corresponding file move.
2. Javadoc changes to use FQN in link tags to fix checkstyle complaints about unused imports, and corresponding fixes to make them fit within line width. No change in contents.
3. In a couple of places, changing method visibility to public with VisibleForTesting annotations for accessing them from Tests (RemoteReadWriteTable.java, RemoteReadableTable.java)

Author: Prateek Maheshwari <pm...@apache.org>

Reviewers: Bharath Kumarasubramanian <bk...@linkedin.com>, Jagadish Venkatraman <vj...@gmail.com>, Yi Pan <ni...@gmail.com>

Closes #720 from prateekm/descriptor-package-cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74675cea
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74675cea
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74675cea

Branch: refs/heads/master
Commit: 74675cea55d163bf18bf16c8619355009af2300c
Parents: 9a5094d
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Fri Oct 12 18:34:34 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Fri Oct 12 18:34:34 2018 -0700

----------------------------------------------------------------------
 docs/startup/quick-start/versioned/index.md     |   2 +-
 .../application/ApplicationDescriptor.java      |  94 ---
 .../samza/application/SamzaApplication.java     |   1 +
 .../samza/application/StreamApplication.java    |   1 +
 .../StreamApplicationDescriptor.java            | 107 ----
 .../samza/application/TaskApplication.java      |   1 +
 .../application/TaskApplicationDescriptor.java  |  64 --
 .../descriptors/ApplicationDescriptor.java      |  96 +++
 .../StreamApplicationDescriptor.java            | 107 ++++
 .../descriptors/TaskApplicationDescriptor.java  |  65 ++
 .../ApplicationContainerContextFactory.java     |   3 +-
 .../context/ApplicationTaskContextFactory.java  |   3 +-
 .../apache/samza/operators/MessageStream.java   |   5 +-
 .../apache/samza/operators/TableDescriptor.java |  64 --
 .../descriptors/GenericInputDescriptor.java     |  48 --
 .../descriptors/GenericOutputDescriptor.java    |  48 --
 .../descriptors/GenericSystemDescriptor.java    |  61 --
 .../base/stream/InputDescriptor.java            | 183 ------
 .../base/stream/OutputDescriptor.java           |  44 --
 .../base/stream/StreamDescriptor.java           | 136 -----
 .../ExpandingInputDescriptorProvider.java       |  44 --
 .../base/system/OutputDescriptorProvider.java   |  48 --
 .../system/SimpleInputDescriptorProvider.java   |  43 --
 .../base/system/SystemDescriptor.java           | 177 ------
 .../TransformingInputDescriptorProvider.java    |  44 --
 .../operators/functions/ClosableFunction.java   |  12 +-
 .../operators/functions/InitableFunction.java   |   9 +-
 .../operators/functions/InputTransformer.java   |  45 --
 .../operators/functions/StreamExpander.java     |  58 --
 .../ExpandingInputDescriptorProvider.java       |  43 ++
 .../descriptors/GenericInputDescriptor.java     |  47 ++
 .../descriptors/GenericOutputDescriptor.java    |  47 ++
 .../descriptors/GenericSystemDescriptor.java    |  58 ++
 .../system/descriptors/InputDescriptor.java     | 181 ++++++
 .../system/descriptors/InputTransformer.java    |  47 ++
 .../system/descriptors/OutputDescriptor.java    |  43 ++
 .../descriptors/OutputDescriptorProvider.java   |  47 ++
 .../SimpleInputDescriptorProvider.java          |  42 ++
 .../system/descriptors/StreamDescriptor.java    | 135 +++++
 .../system/descriptors/StreamExpander.java      |  57 ++
 .../system/descriptors/SystemDescriptor.java    | 175 ++++++
 .../TransformingInputDescriptorProvider.java    |  43 ++
 .../samza/table/TableDescriptorsProvider.java   |   2 +-
 .../org/apache/samza/table/TableProvider.java   |  61 --
 .../samza/table/TableProviderFactory.java       |  35 --
 .../java/org/apache/samza/table/TableSpec.java  |   5 +-
 .../table/descriptors/TableDescriptor.java      |  64 ++
 .../samza/table/descriptors/TableProvider.java  |  62 ++
 .../table/descriptors/TableProviderFactory.java |  36 ++
 .../TestExpandingInputDescriptor.java           |  59 --
 .../descriptors/TestGenericInputDescriptor.java | 123 ----
 .../TestGenericSystemDescriptor.java            |  63 --
 .../descriptors/TestSimpleInputDescriptor.java  |  63 --
 .../TestTransformingInputDescriptor.java        |  64 --
 .../ExampleExpandingInputDescriptor.java        |  30 -
 .../ExampleExpandingOutputDescriptor.java       |  29 -
 .../ExampleExpandingSystemDescriptor.java       |  49 --
 .../serde/ExampleSimpleInputDescriptor.java     |  30 -
 .../serde/ExampleSimpleOutputDescriptor.java    |  29 -
 .../serde/ExampleSimpleSystemDescriptor.java    |  43 --
 .../ExampleTransformingInputDescriptor.java     |  30 -
 .../ExampleTransformingOutputDescriptor.java    |  29 -
 .../ExampleTransformingSystemDescriptor.java    |  43 --
 .../TestExpandingInputDescriptor.java           |  59 ++
 .../descriptors/TestGenericInputDescriptor.java | 123 ++++
 .../TestGenericSystemDescriptor.java            |  63 ++
 .../descriptors/TestSimpleInputDescriptor.java  |  63 ++
 .../TestTransformingInputDescriptor.java        |  64 ++
 .../ExampleExpandingInputDescriptor.java        |  30 +
 .../ExampleExpandingOutputDescriptor.java       |  29 +
 .../ExampleExpandingSystemDescriptor.java       |  49 ++
 .../serde/ExampleSimpleInputDescriptor.java     |  30 +
 .../serde/ExampleSimpleOutputDescriptor.java    |  29 +
 .../serde/ExampleSimpleSystemDescriptor.java    |  43 ++
 .../ExampleTransformingInputDescriptor.java     |  30 +
 .../ExampleTransformingOutputDescriptor.java    |  29 +
 .../ExampleTransformingSystemDescriptor.java    |  43 ++
 .../eventhub/EventHubsInputDescriptor.java      | 121 ----
 .../eventhub/EventHubsOutputDescriptor.java     | 104 ----
 .../eventhub/EventHubsSystemDescriptor.java     | 217 -------
 .../descriptors/EventHubsInputDescriptor.java   | 122 ++++
 .../descriptors/EventHubsOutputDescriptor.java  | 105 ++++
 .../descriptors/EventHubsSystemDescriptor.java  | 219 +++++++
 .../eventhub/TestEventHubsInputDescriptor.java  |  91 ---
 .../eventhub/TestEventHubsOutputDescriptor.java |  88 ---
 .../eventhub/TestEventHubsSystemDescriptor.java | 112 ----
 .../TestEventHubsInputDescriptor.java           |  92 +++
 .../TestEventHubsOutputDescriptor.java          |  89 +++
 .../TestEventHubsSystemDescriptor.java          | 113 ++++
 .../application/ApplicationDescriptorImpl.java  | 298 ---------
 .../application/ApplicationDescriptorUtil.java  |  51 --
 .../application/LegacyTaskApplication.java      |   1 +
 .../StreamApplicationDescriptorImpl.java        | 366 -----------
 .../TaskApplicationDescriptorImpl.java          | 143 -----
 .../descriptors/ApplicationDescriptorImpl.java  | 300 +++++++++
 .../descriptors/ApplicationDescriptorUtil.java  |  54 ++
 .../StreamApplicationDescriptorImpl.java        | 367 +++++++++++
 .../TaskApplicationDescriptorImpl.java          | 144 +++++
 .../apache/samza/config/JavaTableConfig.java    |   4 +-
 .../samza/execution/ExecutionPlanner.java       |   6 +-
 .../org/apache/samza/execution/JobGraph.java    |   4 +-
 .../samza/execution/JobGraphJsonGenerator.java  |   4 +-
 .../org/apache/samza/execution/JobNode.java     |   4 +-
 .../org/apache/samza/execution/JobPlanner.java  |   4 +-
 .../apache/samza/execution/LocalJobPlanner.java |   4 +-
 .../samza/execution/RemoteJobPlanner.java       |   4 +-
 .../samza/operators/BaseTableDescriptor.java    | 110 ----
 .../samza/operators/MessageStreamImpl.java      |   2 +-
 .../samza/operators/OperatorSpecGraph.java      |   2 +-
 .../descriptors/DelegatingSystemDescriptor.java |  64 --
 .../samza/operators/impl/InputOperatorImpl.java |   2 +-
 .../samza/operators/spec/InputOperatorSpec.java |   2 +-
 .../samza/operators/spec/OperatorSpec.java      |   3 +-
 .../samza/operators/spec/OperatorSpecs.java     |   2 +-
 .../stream/IntermediateMessageStreamImpl.java   |   2 +-
 .../samza/runtime/LocalApplicationRunner.java   |   6 +-
 .../samza/runtime/LocalContainerRunner.java     |   6 +-
 .../samza/runtime/RemoteApplicationRunner.java  |   6 +-
 .../descriptors/DelegatingSystemDescriptor.java |  61 ++
 .../samza/table/TableConfigGenerator.java       |   6 +-
 .../org/apache/samza/table/TableManager.java    |   2 +
 .../table/caching/CachingTableDescriptor.java   | 164 -----
 .../table/caching/CachingTableProvider.java     | 104 ----
 .../caching/CachingTableProviderFactory.java    |  34 --
 .../descriptors/CachingTableDescriptor.java     | 166 +++++
 .../descriptors/CachingTableProvider.java       | 105 ++++
 .../CachingTableProviderFactory.java            |  34 ++
 .../guava/GuavaCacheTableDescriptor.java        |  75 ---
 .../caching/guava/GuavaCacheTableProvider.java  |  59 --
 .../guava/GuavaCacheTableProviderFactory.java   |  34 --
 .../descriptors/GuavaCacheTableDescriptor.java  |  75 +++
 .../descriptors/GuavaCacheTableProvider.java    |  60 ++
 .../GuavaCacheTableProviderFactory.java         |  34 ++
 .../descriptors/BaseHybridTableDescriptor.java  |  48 ++
 .../table/descriptors/BaseTableDescriptor.java  | 110 ++++
 .../table/hybrid/BaseHybridTableDescriptor.java |  50 --
 .../table/remote/RemoteReadWriteTable.java      |  15 +-
 .../samza/table/remote/RemoteReadableTable.java |  26 +-
 .../table/remote/RemoteTableDescriptor.java     | 275 ---------
 .../samza/table/remote/RemoteTableProvider.java | 200 ------
 .../remote/RemoteTableProviderFactory.java      |  38 --
 .../descriptors/RemoteTableDescriptor.java      | 278 +++++++++
 .../remote/descriptors/RemoteTableProvider.java | 202 +++++++
 .../descriptors/RemoteTableProviderFactory.java |  38 ++
 .../table/retry/RetriableReadFunction.java      |   2 +-
 .../table/retry/RetriableWriteFunction.java     |   2 +-
 .../samza/table/utils/BaseTableProvider.java    |  73 ---
 .../utils/descriptors/BaseTableProvider.java    |  73 +++
 .../apache/samza/task/StreamOperatorTask.java   |  11 +-
 .../org/apache/samza/task/TaskFactoryUtil.java  |   8 +-
 .../samza/job/local/ThreadJobFactory.scala      |   3 +-
 .../application/MockStreamApplication.java      |   2 +
 .../samza/application/TestApplicationUtil.java  |   2 +
 .../TestStreamApplicationDescriptorImpl.java    | 601 ------------------
 .../TestTaskApplicationDescriptorImpl.java      | 172 ------
 .../TestStreamApplicationDescriptorImpl.java    | 602 +++++++++++++++++++
 .../TestTaskApplicationDescriptorImpl.java      | 173 ++++++
 .../execution/ExecutionPlannerTestBase.java     |  10 +-
 .../samza/execution/TestExecutionPlanner.java   |  22 +-
 .../apache/samza/execution/TestJobGraph.java    |   2 +-
 .../execution/TestJobGraphJsonGenerator.java    |   8 +-
 .../TestJobNodeConfigurationGenerator.java      |  14 +-
 .../samza/execution/TestLocalJobPlanner.java    |   6 +-
 .../samza/execution/TestRemoteJobPlanner.java   |   6 +-
 .../samza/operators/TestJoinOperator.java       |   6 +-
 .../samza/operators/TestMessageStreamImpl.java  |   2 +-
 .../samza/operators/TestOperatorSpecGraph.java  |   2 +-
 .../operators/impl/TestOperatorImplGraph.java   |   8 +-
 .../operators/impl/TestWindowOperator.java      |   6 +-
 .../spec/TestPartitionByOperatorSpec.java       |   6 +-
 .../runtime/TestLocalApplicationRunner.java     |   6 +-
 .../apache/samza/table/TestTableManager.java    |   2 +
 .../samza/table/caching/TestCachingTable.java   |  10 +-
 .../table/remote/TestRemoteTableDescriptor.java | 236 --------
 .../descriptors/TestRemoteTableDescriptor.java  | 239 ++++++++
 .../apache/samza/task/TestTaskFactoryUtil.java  |   6 +-
 .../system/kafka/KafkaInputDescriptor.java      | 108 ----
 .../system/kafka/KafkaOutputDescriptor.java     |  39 --
 .../system/kafka/KafkaSystemDescriptor.java     | 245 --------
 .../kafka/descriptors/KafkaInputDescriptor.java | 108 ++++
 .../descriptors/KafkaOutputDescriptor.java      |  39 ++
 .../descriptors/KafkaSystemDescriptor.java      | 246 ++++++++
 .../system/kafka/TestKafkaInputDescriptor.java  |  66 --
 .../kafka/TestKafkaSystemAdminWithMock.java     |   1 -
 .../system/kafka/TestKafkaSystemDescriptor.java |  69 ---
 .../descriptors/TestKafkaInputDescriptor.java   |  64 ++
 .../descriptors/TestKafkaSystemDescriptor.java  |  70 +++
 .../kv/inmemory/InMemoryTableDescriptor.java    |  74 ---
 .../kv/inmemory/InMemoryTableProvider.java      |  70 ---
 .../inmemory/InMemoryTableProviderFactory.java  |  33 -
 .../descriptors/InMemoryTableDescriptor.java    |  74 +++
 .../descriptors/InMemoryTableProvider.java      |  71 +++
 .../InMemoryTableProviderFactory.java           |  33 +
 .../inmemory/TestInMemoryTableDescriptor.java   |  48 --
 .../kv/inmemory/TestInMemoryTableProvider.java  |  66 --
 .../TestInMemoryTableDescriptor.java            |  48 ++
 .../descriptors/TestInMemoryTableProvider.java  |  67 +++
 .../storage/kv/RocksDbTableDescriptor.java      | 339 -----------
 .../samza/storage/kv/RocksDbTableProvider.java  |  73 ---
 .../storage/kv/RocksDbTableProviderFactory.java |  31 -
 .../kv/descriptors/RocksDbTableDescriptor.java  | 339 +++++++++++
 .../kv/descriptors/RocksDbTableProvider.java    |  74 +++
 .../RocksDbTableProviderFactory.java            |  31 +
 .../storage/kv/TestRocksDbTableDescriptor.java  | 100 ---
 .../storage/kv/TestRocksDbTableProvider.java    |  67 ---
 .../descriptors/TestRocksDbTableDescriptor.java | 100 +++
 .../descriptors/TestRocksDbTableProvider.java   |  68 +++
 .../kv/BaseLocalStoreBackedTableDescriptor.java | 168 ------
 .../kv/BaseLocalStoreBackedTableProvider.java   | 147 -----
 .../BaseLocalStoreBackedTableDescriptor.java    | 168 ++++++
 .../BaseLocalStoreBackedTableProvider.java      | 149 +++++
 .../TestBaseLocalStoreBackedTableProvider.java  | 149 -----
 .../TestBaseLocalStoreBackedTableProvider.java  | 150 +++++
 .../sql/impl/ConfigBasedIOResolverFactory.java  |   4 +-
 .../samza/sql/interfaces/SqlIOConfig.java       |   2 +-
 .../samza/sql/runner/SamzaSqlApplication.java   |   2 +-
 .../samza/sql/translator/ModifyTranslator.java  |   8 +-
 .../samza/sql/translator/QueryTranslator.java   |  10 +-
 .../samza/sql/translator/ScanTranslator.java    |   6 +-
 .../samza/sql/translator/TranslatorContext.java |   4 +-
 .../sql/testutil/TestIOResolverFactory.java     |  12 +-
 .../sql/translator/TestFilterTranslator.java    |   2 +-
 .../sql/translator/TestJoinTranslator.java      |   6 +-
 .../sql/translator/TestProjectTranslator.java   |   2 +-
 .../sql/translator/TestQueryTranslator.java     |   2 +-
 .../sql/translator/TranslatorTestBase.java      |   5 +-
 .../example/AppWithGlobalConfigExample.java     |   8 +-
 .../apache/samza/example/BroadcastExample.java  |   8 +-
 .../samza/example/KeyValueStoreExample.java     |   8 +-
 .../org/apache/samza/example/MergeExample.java  |   8 +-
 .../samza/example/OrderShipmentJoinExample.java |   8 +-
 .../samza/example/PageViewCounterExample.java   |   8 +-
 .../samza/example/RepartitionExample.java       |   8 +-
 .../samza/example/TaskApplicationExample.java   |  12 +-
 .../org/apache/samza/example/WindowExample.java |   8 +-
 .../samza/test/framework/StreamAssert.java      |   6 +-
 .../apache/samza/test/framework/TestRunner.java |   6 +-
 .../system/InMemoryInputDescriptor.java         |  42 --
 .../system/InMemoryOutputDescriptor.java        |  46 --
 .../system/InMemorySystemDescriptor.java        | 109 ----
 .../descriptors/InMemoryInputDescriptor.java    |  42 ++
 .../descriptors/InMemoryOutputDescriptor.java   |  46 ++
 .../descriptors/InMemorySystemDescriptor.java   | 109 ++++
 .../TestStandaloneIntegrationApplication.java   |   8 +-
 .../EndOfStreamIntegrationTest.java             |   6 +-
 .../WatermarkIntegrationTest.java               |   6 +-
 .../AsyncStreamTaskIntegrationTest.java         |   6 +-
 .../test/framework/BroadcastAssertApp.java      |   6 +-
 .../test/framework/FaultInjectionTest.java      |   6 +-
 .../StreamApplicationIntegrationTest.java       |  19 +-
 .../framework/StreamTaskIntegrationTest.java    |  21 +-
 .../samza/test/framework/TestSchedulingApp.java |   6 +-
 .../test/operator/RepartitionJoinWindowApp.java |   6 +-
 .../test/operator/RepartitionWindowApp.java     |   8 +-
 .../samza/test/operator/SessionWindowApp.java   |   8 +-
 .../samza/test/operator/TumblingWindowApp.java  |   8 +-
 .../test/processor/TestStreamApplication.java   |   8 +-
 .../apache/samza/test/table/TestLocalTable.java |   8 +-
 .../table/TestLocalTableWithSideInputs.java     |  16 +-
 .../samza/test/table/TestRemoteTable.java       |  16 +-
 .../table/TestTableDescriptorsProvider.java     |  10 +-
 .../benchmark/SystemConsumerWithSamzaBench.java |   4 +-
 262 files changed, 8425 insertions(+), 8353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/docs/startup/quick-start/versioned/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/quick-start/versioned/index.md b/docs/startup/quick-start/versioned/index.md
index 44b8376..a046ee7 100644
--- a/docs/startup/quick-start/versioned/index.md
+++ b/docs/startup/quick-start/versioned/index.md
@@ -54,7 +54,7 @@ Now let’s write some code! The first step is to create your own Samza applicat
 package samzaapp;
 
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 
 public class WordCount implements StreamApplication {
  @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
deleted file mode 100644
index e806aad..0000000
--- a/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import java.util.Map;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.ApplicationContainerContextFactory;
-import org.apache.samza.context.ApplicationTaskContextFactory;
-import org.apache.samza.metrics.MetricsReporterFactory;
-import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
-
-
-/**
- * The interface class to describe the configuration, input and output streams, and processing logic in a {@link SamzaApplication}.
- * <p>
- * Sub-classes {@link StreamApplicationDescriptor} and {@link TaskApplicationDescriptor} are specific interfaces for applications
- * written in high-level {@link StreamApplication} and low-level {@link TaskApplication} APIs, respectively.
- *
- * @param <S> sub-class of user application descriptor.
- */
-@InterfaceStability.Evolving
-public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
-
-  /**
-   * Get the {@link Config} of the application
-   * @return config of the application
-   */
-  Config getConfig();
-
-  /**
-   * Sets the {@link ApplicationContainerContextFactory} for this application. Each task will be given access to a
-   * different instance of the {@link org.apache.samza.context.ApplicationContainerContext} that this creates. The
-   * context can be accessed through the {@link org.apache.samza.context.Context}.
-   * <p>
-   * Setting this is optional.
-   *
-   * @param factory the {@link ApplicationContainerContextFactory} for this application
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
-   * {@link ApplicationContainerContextFactory}
-   */
-  S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory);
-
-  /**
-   * Sets the {@link ApplicationTaskContextFactory} for this application. Each task will be given access to a different
-   * instance of the {@link org.apache.samza.context.ApplicationTaskContext} that this creates. The context can be
-   * accessed through the {@link org.apache.samza.context.Context}.
-   * <p>
-   * Setting this is optional.
-   *
-   * @param factory the {@link ApplicationTaskContextFactory} for this application
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
-   * {@link ApplicationTaskContextFactory}
-   */
-  S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory);
-
-  /**
-   * Sets the {@link ProcessorLifecycleListenerFactory} for this application.
-   *
-   * <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
-   * plug in optional code to be invoked in different stages before/after the main processing logic is started/stopped in
-   * the application.
-   *
-   * @param listenerFactory the user implemented {@link ProcessorLifecycleListenerFactory} that creates lifecycle listener
-   *                        with callback methods before and after the start/stop of each StreamProcessor in the application
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
-   */
-  S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory);
-
-  /**
-   * Sets a set of customized {@link MetricsReporterFactory}s in the application
-   *
-   * @param reporterFactories the map of customized {@link MetricsReporterFactory}s to be used
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code reporterFactories}
-   */
-  S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java b/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
index 7606be8..5423e2e 100644
--- a/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
@@ -19,6 +19,7 @@
 package org.apache.samza.application;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
index a83cb37..fe77045 100644
--- a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
@@ -19,6 +19,7 @@
 package org.apache.samza.application;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 
 /**
  * Describes and initializes the transforms for processing message streams and generating results in high-level API. 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
deleted file mode 100644
index dc24771..0000000
--- a/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.table.Table;
-
-
-/**
- * The interface class to describe a {@link SamzaApplication} in high-level API in Samza.
- */
-@InterfaceStability.Evolving
-public interface StreamApplicationDescriptor extends ApplicationDescriptor<StreamApplicationDescriptor> {
-
-  /**
-   * Sets the default SystemDescriptor to use for intermediate streams. This is equivalent to setting
-   * {@code job.default.system} and its properties in configuration.
-   * <p>
-   * If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
-   *
-   * @param defaultSystemDescriptor the default system descriptor to use
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
-   */
-  StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
-
-  /**
-   * Gets the input {@link MessageStream} corresponding to the {@code inputDescriptor}.
-   * <p>
-   * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>},
-   * can receive messages of type {@code KV<K, V>}. An input {@code MessageStream<M>}, obtained using a descriptor with
-   * any other {@code Serde<M>}, can receive messages of type M - the key in the incoming message is ignored.
-   * <p>
-   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the
-   * {@code SystemConsumer} deserializes the incoming messages itself, and no further deserialization is required from
-   * the framework.
-   * <p>
-   * Multiple invocations of this method with the same {@code inputDescriptor} will throw an
-   * {@link IllegalStateException}.
-   *
-   * @param inputDescriptor the descriptor for the stream
-   * @param <M> the type of messages in the input {@link MessageStream}
-   * @return the input {@link MessageStream}
-   * @throws IllegalStateException when invoked multiple times with the same {@code inputDescriptor}
-   */
-  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
-
-  /**
-   * Gets the {@link OutputStream} corresponding to the {@code outputDescriptor}.
-   * <p>
-   * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>},
-   * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, obtained using a descriptor with any
-   * other {@code Serde<M>}, can send messages of type M without a key.
-   * <p>
-   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the
-   * {@code SystemProducer} serializes the outgoing messages itself, and no prior serialization is required from
-   * the framework.
-   * <p>
-   * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are partitioned using their serialized key.
-   * When sending messages to any other {@code OutputStream<M>}, messages are partitioned using a null partition key.
-   * <p>
-   * Multiple invocations of this method with the same {@code outputDescriptor} will throw an
-   * {@link IllegalStateException}.
-   *
-   * @param outputDescriptor the descriptor for the stream
-   * @param <M> the type of messages in the {@link OutputStream}
-   * @return the {@link OutputStream}
-   * @throws IllegalStateException when invoked multiple times with the same {@code outputDescriptor}
-   */
-  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
-
-  /**
-   * Gets the {@link Table} corresponding to the {@link TableDescriptor}.
-   * <p>
-   * Multiple invocations of this method with the same {@link TableDescriptor} will throw an
-   * {@link IllegalStateException}.
-   *
-   * @param tableDescriptor the {@link TableDescriptor}
-   * @param <K> the type of the key
-   * @param <V> the type of the value
-   * @return the {@link Table} corresponding to the {@code tableDescriptor}
-   * @throws IllegalStateException when invoked multiple times with the same {@link TableDescriptor}
-   */
-  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java b/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
index 424634d..d84aa12 100644
--- a/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
@@ -19,6 +19,7 @@
 package org.apache.samza.application;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
deleted file mode 100644
index 0226bb5..0000000
--- a/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.task.TaskFactory;
-
-
-/**
- *  The interface to describe a {@link SamzaApplication} that uses low-level API task for processing.
- */
-@InterfaceStability.Evolving
-public interface TaskApplicationDescriptor extends ApplicationDescriptor<TaskApplicationDescriptor> {
-
-  /**
-   * Sets the {@link TaskFactory} for the user application. The {@link TaskFactory#createInstance()} creates task instance
-   * that implements the main processing logic of the user application.
-   *
-   * @param factory the {@link TaskFactory} including the low-level task processing logic. The only allowed task factory
-   *                classes are {@link org.apache.samza.task.StreamTaskFactory} and {@link org.apache.samza.task.AsyncStreamTaskFactory}.
-   */
-  void setTaskFactory(TaskFactory factory);
-
-  /**
-   * Adds the input stream to the application.
-   *
-   * @param isd the {@link InputDescriptor}
-   */
-  void addInputStream(InputDescriptor isd);
-
-  /**
-   * Adds the output stream to the application.
-   *
-   * @param osd the {@link OutputDescriptor} of the output stream
-   */
-  void addOutputStream(OutputDescriptor osd);
-
-  /**
-   * Adds the {@link TableDescriptor} used in the application
-   *
-   * @param table {@link TableDescriptor}
-   */
-  void addTable(TableDescriptor table);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
new file mode 100644
index 0000000..b1e78b0
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.metrics.MetricsReporterFactory;
+import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+
+
+/**
+ * The interface class to describe the configuration, input and output streams, and processing logic in a
+ * {@link org.apache.samza.application.SamzaApplication}.
+ * <p>
+ * Sub-classes {@link StreamApplicationDescriptor} and {@link TaskApplicationDescriptor} are specific interfaces for
+ * applications written in high-level {@link org.apache.samza.application.StreamApplication} and low-level
+ * {@link org.apache.samza.application.TaskApplication} APIs, respectively.
+ *
+ * @param <S> sub-class of user application descriptor.
+ */
+@InterfaceStability.Evolving
+public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
+
+  /**
+   * Get the {@link Config} of the application
+   * @return config of the application
+   */
+  Config getConfig();
+
+  /**
+   * Sets the {@link ApplicationContainerContextFactory} for this application. Each task will be given access to a
+   * different instance of the {@link org.apache.samza.context.ApplicationContainerContext} that this creates. The
+   * context can be accessed through the {@link org.apache.samza.context.Context}.
+   * <p>
+   * Setting this is optional.
+   *
+   * @param factory the {@link ApplicationContainerContextFactory} for this application
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
+   * {@link ApplicationContainerContextFactory}
+   */
+  S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory);
+
+  /**
+   * Sets the {@link ApplicationTaskContextFactory} for this application. Each task will be given access to a different
+   * instance of the {@link org.apache.samza.context.ApplicationTaskContext} that this creates. The context can be
+   * accessed through the {@link org.apache.samza.context.Context}.
+   * <p>
+   * Setting this is optional.
+   *
+   * @param factory the {@link ApplicationTaskContextFactory} for this application
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
+   * {@link ApplicationTaskContextFactory}
+   */
+  S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory);
+
+  /**
+   * Sets the {@link ProcessorLifecycleListenerFactory} for this application.
+   *
+   * <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
+   * plug in optional code to be invoked in different stages before/after the main processing logic is started/stopped in
+   * the application.
+   *
+   * @param listenerFactory the user implemented {@link ProcessorLifecycleListenerFactory} that creates lifecycle listener
+   *                        with callback methods before and after the start/stop of each StreamProcessor in the application
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
+   */
+  S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory);
+
+  /**
+   * Sets a set of customized {@link MetricsReporterFactory}s in the application
+   *
+   * @param reporterFactories the map of customized {@link MetricsReporterFactory}s to be used
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code reporterFactories}
+   */
+  S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
new file mode 100644
index 0000000..383e9ce
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.table.Table;
+
+
+/**
+ * The interface class to describe a {@link org.apache.samza.application.SamzaApplication} in high-level API in Samza.
+ */
+@InterfaceStability.Evolving
+public interface StreamApplicationDescriptor extends ApplicationDescriptor<StreamApplicationDescriptor> {
+
+  /**
+   * Sets the default SystemDescriptor to use for intermediate streams. This is equivalent to setting
+   * {@code job.default.system} and its properties in configuration.
+   * <p>
+   * If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
+   *
+   * @param defaultSystemDescriptor the default system descriptor to use
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
+   */
+  StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
+
+  /**
+   * Gets the input {@link MessageStream} corresponding to the {@code inputDescriptor}.
+   * <p>
+   * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>},
+   * can receive messages of type {@code KV<K, V>}. An input {@code MessageStream<M>}, obtained using a descriptor with
+   * any other {@code Serde<M>}, can receive messages of type M - the key in the incoming message is ignored.
+   * <p>
+   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the
+   * {@code SystemConsumer} deserializes the incoming messages itself, and no further deserialization is required from
+   * the framework.
+   * <p>
+   * Multiple invocations of this method with the same {@code inputDescriptor} will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param inputDescriptor the descriptor for the stream
+   * @param <M> the type of messages in the input {@link MessageStream}
+   * @return the input {@link MessageStream}
+   * @throws IllegalStateException when invoked multiple times with the same {@code inputDescriptor}
+   */
+  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
+
+  /**
+   * Gets the {@link OutputStream} corresponding to the {@code outputDescriptor}.
+   * <p>
+   * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>},
+   * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, obtained using a descriptor with any
+   * other {@code Serde<M>}, can send messages of type M without a key.
+   * <p>
+   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the
+   * {@code SystemProducer} serializes the outgoing messages itself, and no prior serialization is required from
+   * the framework.
+   * <p>
+   * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are partitioned using their serialized key.
+   * When sending messages to any other {@code OutputStream<M>}, messages are partitioned using a null partition key.
+   * <p>
+   * Multiple invocations of this method with the same {@code outputDescriptor} will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param outputDescriptor the descriptor for the stream
+   * @param <M> the type of messages in the {@link OutputStream}
+   * @return the {@link OutputStream}
+   * @throws IllegalStateException when invoked multiple times with the same {@code outputDescriptor}
+   */
+  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
+
+  /**
+   * Gets the {@link Table} corresponding to the {@link TableDescriptor}.
+   * <p>
+   * Multiple invocations of this method with the same {@link TableDescriptor} will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param tableDescriptor the {@link TableDescriptor}
+   * @param <K> the type of the key
+   * @param <V> the type of the value
+   * @return the {@link Table} corresponding to the {@code tableDescriptor}
+   * @throws IllegalStateException when invoked multiple times with the same {@link TableDescriptor}
+   */
+  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
new file mode 100644
index 0000000..4730297
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.task.TaskFactory;
+
+
+/**
+ *  The interface to describe a {@link org.apache.samza.application.SamzaApplication} that uses low-level API task
+ *  for processing.
+ */
+@InterfaceStability.Evolving
+public interface TaskApplicationDescriptor extends ApplicationDescriptor<TaskApplicationDescriptor> {
+
+  /**
+   * Sets the {@link TaskFactory} for the user application. The {@link TaskFactory#createInstance()} creates task instance
+   * that implements the main processing logic of the user application.
+   *
+   * @param factory the {@link TaskFactory} including the low-level task processing logic. The only allowed task factory
+   *                classes are {@link org.apache.samza.task.StreamTaskFactory} and {@link org.apache.samza.task.AsyncStreamTaskFactory}.
+   */
+  void setTaskFactory(TaskFactory factory);
+
+  /**
+   * Adds the input stream to the application.
+   *
+   * @param isd the {@link InputDescriptor}
+   */
+  void addInputStream(InputDescriptor isd);
+
+  /**
+   * Adds the output stream to the application.
+   *
+   * @param osd the {@link OutputDescriptor} of the output stream
+   */
+  void addOutputStream(OutputDescriptor osd);
+
+  /**
+   * Adds the {@link TableDescriptor} used in the application
+   *
+   * @param table {@link TableDescriptor}
+   */
+  void addTable(TableDescriptor table);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
index fbc2eef..074b0b4 100644
--- a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
@@ -29,7 +29,8 @@ import java.io.Serializable;
  * stage. At that stage, the framework-provided job-level and container-level contexts are available for creating the
  * {@link ApplicationContainerContext}.
  * <p>
- * This is {@link Serializable} because it is specified in {@link org.apache.samza.application.ApplicationDescriptor}.
+ * This is {@link Serializable} because it is specified in the
+ * {@link org.apache.samza.application.descriptors.ApplicationDescriptor}.
  * @param <T> concrete type of {@link ApplicationContainerContext} returned by this factory
  */
 public interface ApplicationContainerContextFactory<T extends ApplicationContainerContext> extends Serializable {

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
index af9ad68..619bbc7 100644
--- a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
@@ -29,7 +29,8 @@ import java.io.Serializable;
  * task. At that stage, the framework-provided job-level, container-level, and task-level contexts are available for
  * creating the {@link ApplicationTaskContext}. Also, the application-defined container-level context is available.
  * <p>
- * This is {@link Serializable} because it is specified in {@link org.apache.samza.application.ApplicationDescriptor}.
+ * This is {@link Serializable} because it is specified in the
+ * {@link org.apache.samza.application.descriptors.ApplicationDescriptor}.
  * @param <T> concrete type of {@link ApplicationTaskContext} returned by this factory
  */
 public interface ApplicationTaskContextFactory<T extends ApplicationTaskContext> extends Serializable {

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 97ac65d..f951a84 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -40,7 +40,7 @@ import org.apache.samza.table.Table;
  * A stream of messages that can be transformed into another {@link MessageStream}.
  * <p>
  * A {@link MessageStream} corresponding to an input stream can be obtained using
- * {@link org.apache.samza.application.StreamApplicationDescriptor#getInputStream}.
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor#getInputStream}.
  *
  * @param <M> the type of messages in this stream
  */
@@ -214,7 +214,8 @@ public interface MessageStream<M> {
 
   /**
    * Re-partitions this {@link MessageStream} using keys from the {@code keyExtractor} by creating a new
-   * intermediate stream on the default system provided via {@link org.apache.samza.application.StreamApplicationDescriptor#withDefaultSystem}.
+   * intermediate stream on the default system provided via
+   * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor#withDefaultSystem}.
    * This intermediate stream is both an output and input to the job.
    * <p>
    * Uses the provided {@link KVSerde} for serialization of keys and values.

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
deleted file mode 100644
index dbcd65e..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-/**
- * User facing class to collect metadata that fully describes a
- * Samza table. This interface should be implemented by concrete table implementations.
- * <p>
- * Typical user code should look like the following, notice <code>withConfig()</code>
- * is defined in this class and the rest in subclasses.
- *
- * <pre>
- * {@code
- * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl",
- *         KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
- *     .withBlockSize(1024)
- *     .withConfig("some-key", "some-value");
- * }
- * </pre>
-
- * Once constructed, a table descriptor can be registered with the system. Internally,
- * the table descriptor is then converted to a {@link org.apache.samza.table.TableSpec},
- * which is used to track tables internally.
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- * @param <D> the type of the concrete table descriptor
- */
-@InterfaceStability.Unstable
-public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
-
-  /**
-   * Get the Id of the table
-   * @return Id of the table
-   */
-  String getTableId();
-
-  /**
-   * Add a configuration entry for the table
-   * @param key the key
-   * @param value the value
-   * @return this table descriptor instance
-   */
-  D withConfig(String key, String value);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
deleted file mode 100644
index 09dd381..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * A descriptor for a generic input stream.
- * <p>
- * An instance of this descriptor may be obtained from an appropriately configured {@link GenericSystemDescriptor}.
- * <p>
- * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
- * Otherwise, this {@link GenericInputDescriptor} may be used to provide Samza-specific properties of the input stream.
- * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
- * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- */
-public final class GenericInputDescriptor<StreamMessageType>
-    extends InputDescriptor<StreamMessageType, GenericInputDescriptor<StreamMessageType>> {
-  GenericInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
-    super(streamId, serde, systemDescriptor, null);
-  }
-
-  @Override
-  public GenericInputDescriptor<StreamMessageType> withPhysicalName(String physicalName) {
-    return super.withPhysicalName(physicalName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
deleted file mode 100644
index 155bd4e..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors;
-
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * A descriptor for a generic output stream.
- * <p>
- * An instance of this descriptor may be obtained from an appropriately configured {@link GenericSystemDescriptor}.
- * <p>
- * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
- * Otherwise, this {@link GenericOutputDescriptor} may be used to provide Samza-specific properties of the output stream.
- * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
- * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- */
-public final class GenericOutputDescriptor<StreamMessageType>
-    extends OutputDescriptor<StreamMessageType, GenericOutputDescriptor<StreamMessageType>> {
-  GenericOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
-    super(streamId, serde, systemDescriptor);
-  }
-
-  @Override
-  public GenericOutputDescriptor<StreamMessageType> withPhysicalName(String physicalName) {
-    return super.withPhysicalName(physicalName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
deleted file mode 100644
index 24f7932..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors;
-
-
-import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * A descriptor for a generic system.
- * <p>
- * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
- * Otherwise, this {@link GenericSystemDescriptor} may be used to provide Samza-specific properties of the system.
- * Additional system specific properties may be provided using {@link #withSystemConfigs}
- * <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
- */
-public final class GenericSystemDescriptor extends SystemDescriptor<GenericSystemDescriptor>
-    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
-
-  /**
-   * Constructs a {@link GenericSystemDescriptor} instance with no system level serde.
-   * Serdes must be provided explicitly at stream level when getting input or output descriptors.
-   *
-   * @param systemName name of this system
-   * @param factoryClassName name of the SystemFactory class for this system
-   */
-  public GenericSystemDescriptor(String systemName, String factoryClassName) {
-    super(systemName, factoryClassName, null, null);
-  }
-
-  @Override
-  public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor(
-      String streamId, Serde<StreamMessageType> serde) {
-    return new GenericInputDescriptor<>(streamId, this, serde);
-  }
-
-  @Override
-  public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor(
-      String streamId, Serde<StreamMessageType> serde) {
-    return new GenericOutputDescriptor<>(streamId, this, serde);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
deleted file mode 100644
index 708dd2a..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.stream;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.SystemStreamMetadata.OffsetType;
-
-/**
- * The base descriptor for an input stream. Allows setting properties that are common to all input streams.
- * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- * @param <SubClass> type of the concrete sub-class
- */
-public abstract class InputDescriptor<StreamMessageType, SubClass extends InputDescriptor<StreamMessageType, SubClass>>
-    extends StreamDescriptor<StreamMessageType, SubClass> {
-  private static final String RESET_OFFSET_CONFIG_KEY = "streams.%s.samza.reset.offset";
-  private static final String OFFSET_DEFAULT_CONFIG_KEY = "streams.%s.samza.offset.default";
-  private static final String PRIORITY_CONFIG_KEY = "streams.%s.samza.priority";
-  private static final String BOOTSTRAP_CONFIG_KEY = "streams.%s.samza.bootstrap";
-  private static final String BOUNDED_CONFIG_KEY = "streams.%s.samza.bounded";
-  private static final String DELETE_COMMITTED_MESSAGES_CONFIG_KEY = "streams.%s.samza.delete.committed.messages";
-
-  private final Optional<InputTransformer> transformerOptional;
-
-  private Optional<Boolean> resetOffsetOptional = Optional.empty();
-  private Optional<OffsetType> offsetDefaultOptional = Optional.empty();
-  private Optional<Integer> priorityOptional = Optional.empty();
-  private Optional<Boolean> isBootstrapOptional = Optional.empty();
-  private Optional<Boolean> isBoundedOptional = Optional.empty();
-  private Optional<Boolean> deleteCommittedMessagesOptional = Optional.empty();
-
-  /**
-   * Constructs an {@link InputDescriptor} instance.
-   *
-   * @param streamId id of the stream
-   * @param serde serde for messages in the stream
-   * @param systemDescriptor system descriptor this stream descriptor was obtained from
-   * @param transformer stream level input stream transform function if available, else null
-   */
-  public InputDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor, InputTransformer transformer) {
-    super(streamId, serde, systemDescriptor);
-
-    // stream level transformer takes precedence over system level transformer
-    if (transformer != null) {
-      this.transformerOptional = Optional.of(transformer);
-    } else {
-      this.transformerOptional = systemDescriptor.getTransformer();
-    }
-  }
-
-  /**
-   * If set, when a Samza container starts up, it ignores any checkpointed offset for this particular
-   * input stream. Its behavior is thus determined by the {@link #withOffsetDefault} setting.
-   * Note that the reset takes effect every time a container is started, which may be every time you restart your job,
-   * or more frequently if a container fails and is restarted by the framework.
-   *
-   * @return this input descriptor
-   */
-  public SubClass shouldResetOffset() {
-    this.resetOffsetOptional = Optional.of(true);
-    return (SubClass) this;
-  }
-
-  /**
-   * If a container starts up without a checkpoint, this property determines where in the input stream we should start
-   * consuming. The value must be an OffsetType, one of the following:
-   * <ul>
-   *  <li>upcoming: Start processing messages that are published after the job starts.
-   *                Any messages published while the job was not running are not processed.
-   *  <li>oldest: Start processing at the oldest available message in the system,
-   *              and reprocess the entire available message history.
-   * </ul>
-   * This property is for an individual stream. To set it for all streams within a system, see
-   * {@link SystemDescriptor#withDefaultStreamOffsetDefault}. If both are defined, the stream-level definition
-   * takes precedence.
-   *
-   * @param offsetDefault offset type to start processing from
-   * @return this input descriptor
-   */
-  public SubClass withOffsetDefault(OffsetType offsetDefault) {
-    this.offsetDefaultOptional = Optional.ofNullable(offsetDefault);
-    return (SubClass) this;
-  }
-
-  /**
-   * If one or more streams have a priority set (any positive integer), they will be processed with higher priority
-   * than the other streams.
-   * <p>
-   * You can set several streams to the same priority, or define multiple priority levels by assigning a
-   * higher number to the higher-priority streams.
-   * <p>
-   * If a higher-priority stream has any messages available, they will always be processed first;
-   * messages from lower-priority streams are only processed when there are no new messages on higher-priority inputs.
-   *
-   * @param priority priority for this input stream
-   * @return this input descriptor
-   */
-  public SubClass withPriority(int priority) {
-    this.priorityOptional = Optional.of(priority);
-    return (SubClass) this;
-  }
-
-  /**
-   * If set, this stream will be processed as a bootstrap stream. This means that every time a Samza container
-   * starts up, this stream will be fully consumed before messages from any other stream are processed.
-   *
-   * @return this input descriptor
-   */
-  public SubClass shouldBootstrap() {
-    this.isBootstrapOptional = Optional.of(true);
-    return (SubClass) this;
-  }
-
-  /**
-   * If set, this stream will be considered a bounded stream. If all input streams in an application are
-   * bounded, the job is considered to be running in batch processing mode.
-   *
-   * @return this input descriptor
-   */
-  public SubClass isBounded() {
-    this.isBoundedOptional = Optional.of(true);
-    return (SubClass) this;
-  }
-
-  /**
-   * If set, and supported by the system implementation, messages older than the latest checkpointed offset
-   * for this stream may be deleted after the commit.
-   *
-   * @return this input descriptor
-   */
-  public SubClass shouldDeleteCommittedMessages() {
-    this.deleteCommittedMessagesOptional = Optional.of(true);
-    return (SubClass) this;
-  }
-
-  public Optional<InputTransformer> getTransformer() {
-    return this.transformerOptional;
-  }
-
-  @Override
-  public Map<String, String> toConfig() {
-    HashMap<String, String> configs = new HashMap<>(super.toConfig());
-    String streamId = getStreamId();
-    this.offsetDefaultOptional.ifPresent(od ->
-        configs.put(String.format(OFFSET_DEFAULT_CONFIG_KEY, streamId), od.name().toLowerCase()));
-    this.resetOffsetOptional.ifPresent(resetOffset ->
-        configs.put(String.format(RESET_OFFSET_CONFIG_KEY, streamId), Boolean.toString(resetOffset)));
-    this.priorityOptional.ifPresent(priority ->
-        configs.put(String.format(PRIORITY_CONFIG_KEY, streamId), Integer.toString(priority)));
-    this.isBootstrapOptional.ifPresent(bootstrap ->
-        configs.put(String.format(BOOTSTRAP_CONFIG_KEY, streamId), Boolean.toString(bootstrap)));
-    this.isBoundedOptional.ifPresent(bounded ->
-        configs.put(String.format(BOUNDED_CONFIG_KEY, streamId), Boolean.toString(bounded)));
-    this.deleteCommittedMessagesOptional.ifPresent(deleteCommittedMessages ->
-        configs.put(String.format(DELETE_COMMITTED_MESSAGES_CONFIG_KEY, streamId),
-            Boolean.toString(deleteCommittedMessages)));
-    return Collections.unmodifiableMap(configs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
deleted file mode 100644
index 20bbc53..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.stream;
-
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * The base descriptor for an output stream. Allows setting properties that are common to all output streams.
- * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- * @param <SubClass> type of the concrete sub-class
- */
-public abstract class OutputDescriptor<StreamMessageType, SubClass extends OutputDescriptor<StreamMessageType, SubClass>>
-    extends StreamDescriptor<StreamMessageType, SubClass> {
-  /**
-   * Constructs an {@link OutputDescriptor} instance.
-   *
-   * @param streamId id of the stream
-   * @param serde serde for messages in the stream
-   * @param systemDescriptor system descriptor this stream descriptor was obtained from
-   */
-  public OutputDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor) {
-    super(streamId, serde, systemDescriptor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
deleted file mode 100644
index f7de728..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.stream;
-
-import com.google.common.base.Preconditions;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * The base descriptor for an input or output stream. Allows setting properties that are common to all streams.
- * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- * @param <SubClass> type of the concrete sub-class
- */
-public abstract class StreamDescriptor<StreamMessageType, SubClass extends StreamDescriptor<StreamMessageType, SubClass>> {
-  private static final String SYSTEM_CONFIG_KEY = "streams.%s.samza.system";
-  private static final String PHYSICAL_NAME_CONFIG_KEY = "streams.%s.samza.physical.name";
-  private static final String STREAM_CONFIGS_CONFIG_KEY = "streams.%s.%s";
-  private static final Pattern STREAM_ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
-
-  private final String streamId;
-  private final Serde serde;
-  private final SystemDescriptor systemDescriptor;
-
-  private final Map<String, String> streamConfigs = new HashMap<>();
-  private Optional<String> physicalNameOptional = Optional.empty();
-
-  /**
-   * Constructs a {@link StreamDescriptor} instance.
-   *
-   * @param streamId id of the stream
-   * @param serde serde for messages in the stream
-   * @param systemDescriptor system descriptor this stream descriptor was obtained from
-   */
-  StreamDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor) {
-    Preconditions.checkArgument(systemDescriptor != null,
-        String.format("SystemDescriptor must not be null. streamId: %s", streamId));
-    String systemName = systemDescriptor.getSystemName();
-    Preconditions.checkState(isValidStreamId(streamId),
-        String.format("streamId must be non-empty and must not contain spaces or special characters. " +
-            "streamId: %s, systemName: %s", streamId, systemName));
-    Preconditions.checkArgument(serde != null,
-        String.format("Serde must not be null. streamId: %s systemName: %s", streamId, systemName));
-    this.streamId = streamId;
-    this.serde = serde;
-    this.systemDescriptor = systemDescriptor;
-  }
-
-  /**
-   * The physical name of the stream on the system on which this stream will be accessed.
-   * This is opposed to the {@code streamId} which is the logical name that Samza uses to identify the stream.
-   * <p>
-   * A physical name could be a Kafka topic name, an HDFS file URN, or any other system-specific identifier.
-   * <p>
-   * If not provided, the logical {@code streamId} is used as the physical name.
-   *
-   * @param physicalName physical name for this stream.
-   * @return this stream descriptor.
-   */
-  protected SubClass withPhysicalName(String physicalName) {
-    this.physicalNameOptional = Optional.ofNullable(physicalName);
-    return (SubClass) this;
-  }
-
-  /**
-   * Additional system-specific properties for this stream.
-   * <p>
-   * These properties are added under the {@code streams.stream-id.*} scope.
-   *
-   * @param streamConfigs system-specific properties for this stream
-   * @return this stream descriptor
-   */
-  public SubClass withStreamConfigs(Map<String, String> streamConfigs) {
-    this.streamConfigs.putAll(streamConfigs);
-    return (SubClass) this;
-  }
-
-  public String getStreamId() {
-    return this.streamId;
-  }
-
-  public String getSystemName() {
-    return this.systemDescriptor.getSystemName();
-  }
-
-  public Serde getSerde() {
-    return this.serde;
-  }
-
-  public SystemDescriptor getSystemDescriptor() {
-    return this.systemDescriptor;
-  }
-
-  public Optional<String> getPhysicalName() {
-    return physicalNameOptional;
-  }
-
-  private boolean isValidStreamId(String id) {
-    return StringUtils.isNotBlank(id) && STREAM_ID_PATTERN.matcher(id).matches();
-  }
-
-  public Map<String, String> toConfig() {
-    HashMap<String, String> configs = new HashMap<>();
-    configs.put(String.format(SYSTEM_CONFIG_KEY, streamId), getSystemName());
-    this.physicalNameOptional.ifPresent(physicalName ->
-        configs.put(String.format(PHYSICAL_NAME_CONFIG_KEY, streamId), physicalName));
-    this.streamConfigs.forEach((key, value) ->
-        configs.put(String.format(STREAM_CONFIGS_CONFIG_KEY, streamId, key), value));
-    return Collections.unmodifiableMap(configs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
deleted file mode 100644
index 05179dd..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.system;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * Interface for advanced {@code SystemDescriptor}s that constrain the type of returned {@code InputDescriptor}s to
- * their own {@code StreamExpander} function result types.
- *
- * @param <StreamExpanderType> type of the system level {@code StreamExpander} results
- */
-public interface ExpandingInputDescriptorProvider<StreamExpanderType> {
-
-  /**
-   * Gets a {@link InputDescriptor} for an input stream on this system. The stream has the provided
-   * stream level serde, and the default system level {@code StreamExpander}
-   * <p>
-   * The type of messages in the stream is the type of messages returned by the default system level
-   * {@code StreamExpander}
-   *
-   * @param streamId id of the input stream
-   * @param serde stream level serde to be propagated to expanded input streams
-   * @return an {@link InputDescriptor} for the input stream
-   */
-  InputDescriptor<StreamExpanderType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde serde);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
deleted file mode 100644
index c2ceb53..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.system;
-
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.serializers.Serde;
-
-
-/**
- * Interface for simple {@code SystemDescriptors} that return {@code OutputDescriptors} parameterized by the type of
- * the provided stream level serde.
- */
-public interface OutputDescriptorProvider {
-
-  /**
-   * Gets an {@link OutputDescriptor} representing an output stream on this system that uses the provided
-   * stream specific serde instead of the default system serde.
-   * <p>
-   * An {@code OutputStream<KV<K, V>>}, obtained using a descriptor with a {@code KVSerde<K, V>}, can send messages
-   * of type {@code KV<K, V>}. An {@code OutputStream<M>} with any other {@code Serde<M>} can send messages of
-   * type M without a key.
-   * <p>
-   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used if the {@code SystemProducer}
-   * serializes the outgoing messages itself, and no prior serialization is required from the framework.
-   *
-   * @param streamId id of the output stream
-   * @param serde serde for this output stream that overrides the default system serde, if any.
-   * @param <StreamMessageType> type of messages in the output stream
-   * @return the {@link OutputDescriptor} for the output stream
-   */
-  <StreamMessageType> OutputDescriptor<StreamMessageType, ? extends OutputDescriptor> getOutputDescriptor(String streamId, Serde<StreamMessageType> serde);
-}