You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:46 UTC

[06/12] samza git commit: Consolidating package names for System, Stream, Application and Table descriptors.

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
new file mode 100644
index 0000000..0187b2e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.util.EmbeddedTaggedRateLimiter;
+import org.apache.samza.util.RateLimiter;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Table descriptor for remote store backed tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, RemoteTableDescriptor<K, V>> {
+  /**
+   * Tag to be used for provision credits for rate limiting read operations from the remote table.
+   * Caller must pre-populate the credits with this tag when specifying a custom rate limiter instance
+   * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
+   * TableRateLimiter.CreditFunction)}
+   */
+  public static final String RL_READ_TAG = "readTag";
+
+  /**
+   * Tag to be used for provision credits for rate limiting write operations into the remote table.
+   * Caller can optionally populate the credits with this tag when specifying a custom rate limiter instance
+   * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
+   * TableRateLimiter.CreditFunction)} and it needs the write functionality.
+   */
+  public static final String RL_WRITE_TAG = "writeTag";
+
+  // Input support for a specific remote store (required)
+  private TableReadFunction<K, V> readFn;
+
+  // Output support for a specific remote store (optional)
+  private TableWriteFunction<K, V> writeFn;
+
+  // Rate limiter for client-side throttling;
+  // can either be constructed indirectly from rates or overridden by withRateLimiter()
+  private RateLimiter rateLimiter;
+
+  // Rates for constructing the default rate limiter when they are non-zero
+  private Map<String, Integer> tagCreditsMap = new HashMap<>();
+
+  private TableRateLimiter.CreditFunction<K, V> readCreditFn;
+  private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
+
+  private TableRetryPolicy readRetryPolicy;
+  private TableRetryPolicy writeRetryPolicy;
+
+  // By default execute future callbacks on the native client threads
+  // ie. no additional thread pool for callbacks.
+  private int asyncCallbackPoolSize = -1;
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   */
+  public RemoteTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   * @param serde the serde for key and value
+   */
+  public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    super(tableId, serde);
+  }
+
+  @Override
+  public TableSpec getTableSpec() {
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    // Serialize and store reader/writer functions
+    tableSpecConfig.put(RemoteTableProvider.READ_FN, SerdeUtils.serialize("read function", readFn));
+
+    if (writeFn != null) {
+      tableSpecConfig.put(RemoteTableProvider.WRITE_FN, SerdeUtils.serialize("write function", writeFn));
+    }
+
+    // Serialize the rate limiter if specified
+    if (!tagCreditsMap.isEmpty()) {
+      rateLimiter = new EmbeddedTaggedRateLimiter(tagCreditsMap);
+    }
+
+    if (rateLimiter != null) {
+      tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
+    }
+
+    // Serialize the readCredit and writeCredit functions
+    if (readCreditFn != null) {
+      tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, SerdeUtils.serialize(
+          "read credit function", readCreditFn));
+    }
+
+    if (writeCreditFn != null) {
+      tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, SerdeUtils.serialize(
+          "write credit function", writeCreditFn));
+    }
+
+    if (readRetryPolicy != null) {
+      tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize(
+          "read retry policy", readRetryPolicy));
+    }
+
+    if (writeRetryPolicy != null) {
+      tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize(
+          "write retry policy", writeRetryPolicy));
+    }
+
+    tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
+
+    return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig);
+  }
+
+  /**
+   * Use specified TableReadFunction with remote table and a retry policy.
+   * @param readFn read function instance
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn) {
+    Preconditions.checkNotNull(readFn, "null read function");
+    this.readFn = readFn;
+    return this;
+  }
+
+  /**
+   * Use specified TableWriteFunction with remote table and a retry policy.
+   * @param writeFn write function instance
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    this.writeFn = writeFn;
+    return this;
+  }
+
+  /**
+   * Use specified TableReadFunction with remote table.
+   * @param readFn read function instance
+   * @param retryPolicy retry policy for the read function
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
+    Preconditions.checkNotNull(readFn, "null read function");
+    Preconditions.checkNotNull(retryPolicy, "null retry policy");
+    this.readFn = readFn;
+    this.readRetryPolicy = retryPolicy;
+    return this;
+  }
+
+  /**
+   * Use specified TableWriteFunction with remote table.
+   * @param writeFn write function instance
+   * @param retryPolicy retry policy for the write function
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(retryPolicy, "null retry policy");
+    this.writeFn = writeFn;
+    this.writeRetryPolicy = retryPolicy;
+    return this;
+  }
+
+  /**
+   * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
+   * of credits to be charged from the rate limiter for table read and write operations.
+   * This is an advanced API that provides greater flexibility to throttle each record in the table
+   * with different number of credits. For most common use-cases eg: limit the number of read/write
+   * operations, please instead use the {@link RemoteTableDescriptor#withReadRateLimit(int)} and
+   * {@link RemoteTableDescriptor#withWriteRateLimit(int)}.
+   *
+   * @param rateLimiter rate limiter instance to be used for throttling
+   * @param readCreditFn credit function for rate limiting read operations
+   * @param writeCreditFn credit function for rate limiting write operations
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withRateLimiter(RateLimiter rateLimiter,
+      TableRateLimiter.CreditFunction<K, V> readCreditFn,
+      TableRateLimiter.CreditFunction<K, V> writeCreditFn) {
+    Preconditions.checkNotNull(rateLimiter, "null read rate limiter");
+    this.rateLimiter = rateLimiter;
+    this.readCreditFn = readCreditFn;
+    this.writeCreditFn = writeCreditFn;
+    return this;
+  }
+
+  /**
+   * Specify the rate limit for table read operations. If the read rate limit is set with this method
+   * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
+   * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
+   * and vice versa.
+   * @param creditsPerSec rate limit for read operations; must be positive
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withReadRateLimit(int creditsPerSec) {
+    Preconditions.checkArgument(creditsPerSec > 0, "Max read rate must be a positive number.");
+    tagCreditsMap.put(RL_READ_TAG, creditsPerSec);
+    return this;
+  }
+
+  /**
+   * Specify the rate limit for table write operations. If the write rate limit is set with this method
+   * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
+   * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
+   * and vice versa.
+   * @param creditsPerSec rate limit for write operations; must be positive
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withWriteRateLimit(int creditsPerSec) {
+    Preconditions.checkArgument(creditsPerSec > 0, "Max write rate must be a positive number.");
+    tagCreditsMap.put(RL_WRITE_TAG, creditsPerSec);
+    return this;
+  }
+
+  /**
+   * Specify the size of the thread pool for the executor used to execute
+   * callbacks of CompletableFutures of async Table operations. By default, these
+   * futures are completed (called) by the threads of the native store client. Depending
+   * on the implementation of the native client, it may or may not allow executing long
+   * running operations in the callbacks. This config can be used to execute the callbacks
+   * from a separate executor to decouple from the native client. If configured, this
+   * thread pool is shared by all read and write operations.
+   * @param poolSize max number of threads in the executor for async callbacks
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withAsyncCallbackExecutorPoolSize(int poolSize) {
+    this.asyncCallbackPoolSize = poolSize;
+    return this;
+  }
+
+  @Override
+  protected void validate() {
+    super.validate();
+    Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
+    Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
+        "Only one of rateLimiter instance or read/write limits can be specified");
+    // Assume callback executor pool should have no more than 20 threads
+    Preconditions.checkArgument(asyncCallbackPoolSize <= 20,
+        "too many threads for async callback executor.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProvider.java
new file mode 100644
index 0000000..fdccc70
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProvider.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote.descriptors;
+
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteReadableTable;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+import org.apache.samza.table.utils.descriptors.BaseTableProvider;
+import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.table.utils.TableMetricsUtil;
+import org.apache.samza.util.RateLimiter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+
+/**
+ * Provide for remote table instances
+ */
+public class RemoteTableProvider extends BaseTableProvider {
+
+  static final String READ_FN = "io.read.func";
+  static final String WRITE_FN = "io.write.func";
+  static final String RATE_LIMITER = "io.ratelimiter";
+  static final String READ_CREDIT_FN = "io.read.credit.func";
+  static final String WRITE_CREDIT_FN = "io.write.credit.func";
+  static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
+  static final String READ_RETRY_POLICY = "io.read.retry.policy";
+  static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
+
+  private final boolean readOnly;
+  private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
+
+  /**
+   * Map of tableId -> executor service for async table IO and callbacks. The same executors
+   * are shared by both read/write operations such that tables of the same tableId all share
+   * the set same of executors globally whereas table itself is per-task.
+   */
+  private static Map<String, ExecutorService> tableExecutors = new ConcurrentHashMap<>();
+  private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
+  private static ScheduledExecutorService retryExecutor;
+
+  public RemoteTableProvider(TableSpec tableSpec) {
+    super(tableSpec);
+    this.readOnly = !tableSpec.getConfig().containsKey(WRITE_FN);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Table getTable() {
+    RemoteReadableTable table;
+    String tableId = tableSpec.getId();
+
+    TableReadFunction readFn = getReadFn();
+    RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
+    if (rateLimiter != null) {
+      rateLimiter.init(this.context);
+    }
+    TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(READ_CREDIT_FN);
+    TableRateLimiter readRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
+
+    TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
+    TableRateLimiter writeRateLimiter = null;
+
+    TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY);
+    TableRetryPolicy writeRetryPolicy = null;
+
+    if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
+      retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
+          Thread thread = new Thread(runnable);
+          thread.setName("table-retry-executor");
+          thread.setDaemon(true);
+          return thread;
+        });
+    }
+
+    if (readRetryPolicy != null) {
+      readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
+    }
+
+    TableWriteFunction writeFn = getWriteFn();
+
+    boolean isRateLimited = readRateLimiter.isRateLimited();
+    if (!readOnly) {
+      writeCreditFn = deserializeObject(WRITE_CREDIT_FN);
+      writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
+      isRateLimited |= writeRateLimiter.isRateLimited();
+      writeRetryPolicy = deserializeObject(WRITE_RETRY_POLICY);
+      if (writeRetryPolicy != null) {
+        writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
+      }
+    }
+
+    // Optional executor for future callback/completion. Shared by both read and write operations.
+    int callbackPoolSize = Integer.parseInt(tableSpec.getConfig().get(ASYNC_CALLBACK_POOL_SIZE));
+    if (callbackPoolSize > 0) {
+      callbackExecutors.computeIfAbsent(tableId, (arg) ->
+          Executors.newFixedThreadPool(callbackPoolSize, (runnable) -> {
+              Thread thread = new Thread(runnable);
+              thread.setName("table-" + tableId + "-async-callback-pool");
+              thread.setDaemon(true);
+              return thread;
+            }));
+    }
+
+    if (isRateLimited) {
+      tableExecutors.computeIfAbsent(tableId, (arg) ->
+          Executors.newSingleThreadExecutor(runnable -> {
+              Thread thread = new Thread(runnable);
+              thread.setName("table-" + tableId + "-async-executor");
+              thread.setDaemon(true);
+              return thread;
+            }));
+    }
+
+    if (readOnly) {
+      table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter,
+          tableExecutors.get(tableId), callbackExecutors.get(tableId));
+    } else {
+      table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter,
+          writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
+    }
+
+    TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
+    if (readRetryPolicy != null) {
+      ((RetriableReadFunction) readFn).setMetrics(metricsUtil);
+    }
+    if (writeRetryPolicy != null) {
+      ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
+    }
+
+    table.init(this.context);
+    tables.add(table);
+    return table;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() {
+    tables.forEach(t -> t.close());
+    tableExecutors.values().forEach(e -> e.shutdown());
+    callbackExecutors.values().forEach(e -> e.shutdown());
+  }
+
+  private <T> T deserializeObject(String key) {
+    String entry = tableSpec.getConfig().getOrDefault(key, "");
+    if (entry.isEmpty()) {
+      return null;
+    }
+    return SerdeUtils.deserialize(key, entry);
+  }
+
+  private TableReadFunction<?, ?> getReadFn() {
+    TableReadFunction<?, ?> readFn = deserializeObject(READ_FN);
+    if (readFn != null) {
+      readFn.init(this.context);
+    }
+    return readFn;
+  }
+
+  private TableWriteFunction<?, ?> getWriteFn() {
+    TableWriteFunction<?, ?> writeFn = deserializeObject(WRITE_FN);
+    if (writeFn != null) {
+      writeFn.init(this.context);
+    }
+    return writeFn;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProviderFactory.java
new file mode 100644
index 0000000..4802265
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProviderFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote.descriptors;
+
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.descriptors.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory class for a remote table provider
+ */
+public class RemoteTableProviderFactory implements TableProviderFactory {
+  @Override
+  public TableProvider getTableProvider(TableSpec tableSpec) {
+    Preconditions.checkNotNull(tableSpec, "null table spec");
+    return new RemoteTableProvider(tableSpec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
index 1adddc0..8f7aa7e 100644
--- a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
@@ -39,7 +39,7 @@ import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
 /**
  * Wrapper for a {@link TableReadFunction} instance to add common retry
  * support with a {@link TableRetryPolicy}. This wrapper is created by
- * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry
+ * {@link org.apache.samza.table.remote.descriptors.RemoteTableProvider} when a retry
  * policy is specified together with the {@link TableReadFunction}.
  *
  * Actual retry mechanism is provided by the failsafe library. Retry is

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
index 2f3f062..ee7959a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
@@ -39,7 +39,7 @@ import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
 /**
  * Wrapper for a {@link TableWriteFunction} instance to add common retry
  * support with a {@link TableRetryPolicy}. This wrapper is created by
- * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry
+ * {@link org.apache.samza.table.remote.descriptors.RemoteTableProvider} when a retry
  * policy is specified together with the {@link TableWriteFunction}.
  *
  * Actual retry mechanism is provided by the failsafe library. Retry is

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
deleted file mode 100644
index dfbd835..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Base class for all table provider implementations.
- */
-abstract public class BaseTableProvider implements TableProvider {
-
-  final protected Logger logger = LoggerFactory.getLogger(getClass());
-
-  final protected TableSpec tableSpec;
-
-  protected Context context;
-
-  public BaseTableProvider(TableSpec tableSpec) {
-    this.tableSpec = tableSpec;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void init(Context context) {
-    this.context = context;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
-    Map<String, String> tableConfig = new HashMap<>();
-
-    // Insert table_id prefix to config entries
-    tableSpec.getConfig().forEach((k, v) -> {
-        String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
-        tableConfig.put(realKey, v);
-      });
-
-    logger.info("Generated configuration for table " + tableSpec.getId());
-
-    return tableConfig;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/utils/descriptors/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/descriptors/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/utils/descriptors/BaseTableProvider.java
new file mode 100644
index 0000000..6f787d1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/descriptors/BaseTableProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for all table provider implementations.
+ */
+abstract public class BaseTableProvider implements TableProvider {
+
+  final protected Logger logger = LoggerFactory.getLogger(getClass());
+
+  final protected TableSpec tableSpec;
+
+  protected Context context;
+
+  public BaseTableProvider(TableSpec tableSpec) {
+    this.tableSpec = tableSpec;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void init(Context context) {
+    this.context = context;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
+    Map<String, String> tableConfig = new HashMap<>();
+
+    // Insert table_id prefix to config entries
+    tableSpec.getConfig().forEach((k, v) -> {
+        String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
+        tableConfig.put(realKey, v);
+      });
+
+    logger.info("Generated configuration for table " + tableSpec.getId());
+
+    return tableConfig;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 218ba5d..87131d7 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -64,11 +64,12 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
    * Initializes this task during startup.
    * <p>
    * Implementation: Initializes the runtime {@link OperatorImplGraph} according to user-defined {@link OperatorSpecGraph}.
-   * Users set the input and output streams and the task-wide context manager using {@link org.apache.samza.application.StreamApplicationDescriptor} APIs,
-   * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs. After the
-   * {@link org.apache.samza.application.StreamApplicationDescriptorImpl} is initialized once by the application, it then creates
-   * an immutable {@link OperatorSpecGraph} accordingly, which is passed in to this class to create the {@link OperatorImplGraph}
-   * corresponding to the logical DAG.
+   * Users set the input and output streams and the task-wide context manager using
+   * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor} APIs, and the logical transforms
+   * using the {@link org.apache.samza.operators.MessageStream} APIs. After the
+   * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl} is initialized once by the
+   * application, it then creates an immutable {@link OperatorSpecGraph} accordingly, which is passed in to this
+   * class to create the {@link OperatorImplGraph} corresponding to the logical DAG.
    *
    * @param context allows initializing and accessing contextual data of this StreamTask
    * @throws Exception in case of initialization errors

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
index c312fac..b2297e1 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
@@ -21,10 +21,10 @@ package org.apache.samza.task;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.application.TaskApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
 import org.apache.samza.config.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 929d6a4..ee6aff3 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,7 +19,8 @@
 
 package org.apache.samza.job.local
 
-import org.apache.samza.application.{ApplicationDescriptorUtil, ApplicationUtil}
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
 import org.apache.samza.config.{Config, TaskConfigJava}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java b/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
index ccd88b8..8b96c8a 100644
--- a/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
+++ b/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.application;
 
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+
 /**
  * Test class of {@link StreamApplication} for unit tests
  */

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
index 9b590c4..ab91cee 100644
--- a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
@@ -25,6 +25,8 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
 import org.apache.samza.task.MockStreamTask;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
deleted file mode 100644
index de16ef2..0000000
--- a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
+++ /dev/null
@@ -1,601 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.context.ApplicationContainerContextFactory;
-import org.apache.samza.context.ApplicationTaskContextFactory;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
-import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.StreamExpander;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Unit test for {@link StreamApplicationDescriptorImpl}
- */
-public class TestStreamApplicationDescriptorImpl {
-
-  @Test
-  public void testConstructor() {
-    StreamApplication mockApp = mock(StreamApplication.class);
-    Config mockConfig = mock(Config.class);
-    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(mockApp, mockConfig);
-    verify(mockApp).describe(appDesc);
-    assertEquals(mockConfig, appDesc.config);
-  }
-
-  @Test
-  public void testGetInputStreamWithValueSerde() {
-
-    String streamId = "test-stream-1";
-    Serde mockValueSerde = mock(Serde.class);
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(isd);
-      }, mock(Config.class));
-
-    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(streamId, inputOpSpec.getStreamId());
-    assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
-    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
-  }
-
-  @Test
-  public void testGetInputStreamWithKeyValueSerde() {
-
-    String streamId = "test-stream-1";
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(isd);
-      }, mock(Config.class));
-
-    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(streamId, inputOpSpec.getStreamId());
-    assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
-    assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
-    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetInputStreamWithNullSerde() {
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null);
-    new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(isd);
-      }, mock(Config.class));
-  }
-
-  @Test
-  public void testGetInputStreamWithTransformFunction() {
-    String streamId = "test-stream-1";
-    Serde mockValueSerde = mock(Serde.class);
-    InputTransformer transformer = ime -> ime;
-    MockTransformingSystemDescriptor sd = new MockTransformingSystemDescriptor("mockSystem", transformer);
-    MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(isd);
-      }, mock(Config.class));
-
-    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(streamId, inputOpSpec.getStreamId());
-    assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
-    assertEquals(transformer, inputOpSpec.getTransformer());
-  }
-
-  @Test
-  public void testGetInputStreamWithExpandingSystem() {
-    String streamId = "test-stream-1";
-    String expandedStreamId = "expanded-stream";
-    AtomicInteger expandCallCount = new AtomicInteger();
-    StreamExpander expander = (sg, isd) -> {
-      expandCallCount.incrementAndGet();
-      InputDescriptor expandedISD =
-          new GenericSystemDescriptor("expanded-system", "mockFactoryClass")
-              .getInputDescriptor(expandedStreamId, new IntegerSerde());
-
-      return sg.getInputStream(expandedISD);
-    };
-    MockExpandingSystemDescriptor sd = new MockExpandingSystemDescriptor("mock-system", expander);
-    MockInputDescriptor isd = sd.getInputDescriptor(streamId, new IntegerSerde());
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(isd);
-      }, mock(Config.class));
-
-    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(expandedStreamId);
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(1, expandCallCount.get());
-    assertFalse(streamAppDesc.getInputOperators().containsKey(streamId));
-    assertFalse(streamAppDesc.getInputDescriptors().containsKey(streamId));
-    assertTrue(streamAppDesc.getInputDescriptors().containsKey(expandedStreamId));
-    assertEquals(expandedStreamId, inputOpSpec.getStreamId());
-  }
-
-  @Test
-  public void testGetInputStreamWithRelaxedTypes() {
-    String streamId = "test-stream-1";
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(isd);
-      }, mock(Config.class));
-
-    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(streamId, inputOpSpec.getStreamId());
-    assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
-  }
-
-  @Test
-  public void testMultipleGetInputStreams() {
-    String streamId1 = "test-stream-1";
-    String streamId2 = "test-stream-2";
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId1, mock(Serde.class));
-    GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId2, mock(Serde.class));
-
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(isd1);
-        appDesc.getInputStream(isd2);
-      }, mock(Config.class));
-
-    InputOperatorSpec inputOpSpec1 = streamAppDesc.getInputOperators().get(streamId1);
-    InputOperatorSpec inputOpSpec2 = streamAppDesc.getInputOperators().get(streamId2);
-
-    assertEquals(2, streamAppDesc.getInputOperators().size());
-    assertEquals(streamId1, inputOpSpec1.getStreamId());
-    assertEquals(streamId2, inputOpSpec2.getStreamId());
-    assertEquals(2, streamAppDesc.getInputDescriptors().size());
-    assertEquals(isd1, streamAppDesc.getInputDescriptors().get(streamId1));
-    assertEquals(isd2, streamAppDesc.getInputDescriptors().get(streamId2));
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetSameInputStreamTwice() {
-    String streamId = "test-stream-1";
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId, mock(Serde.class));
-    GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId, mock(Serde.class));
-    new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(isd1);
-        // should throw exception
-        appDesc.getInputStream(isd2);
-      }, mock(Config.class));
-  }
-
-  @Test
-  public void testMultipleSystemDescriptorForSameSystemName() {
-    GenericSystemDescriptor sd1 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericSystemDescriptor sd2 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericInputDescriptor isd1 = sd1.getInputDescriptor("test-stream-1", mock(Serde.class));
-    GenericInputDescriptor isd2 = sd2.getInputDescriptor("test-stream-2", mock(Serde.class));
-    GenericOutputDescriptor osd1 = sd2.getOutputDescriptor("test-stream-3", mock(Serde.class));
-
-    new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(isd1);
-        try {
-          appDesc.getInputStream(isd2);
-          fail("Adding input stream with the same system name but different SystemDescriptor should have failed");
-        } catch (IllegalStateException e) { }
-
-        try {
-          appDesc.getOutputStream(osd1);
-          fail("adding output stream with the same system name but different SystemDescriptor should have failed");
-        } catch (IllegalStateException e) { }
-      }, mock(Config.class));
-
-    new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.withDefaultSystem(sd2);
-        try {
-          appDesc.getInputStream(isd1);
-          fail("Adding input stream with the same system name as the default system but different SystemDescriptor should have failed");
-        } catch (IllegalStateException e) { }
-      }, mock(Config.class));
-  }
-
-  @Test
-  public void testGetOutputStreamWithKeyValueSerde() {
-    String streamId = "test-stream-1";
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockKVSerde);
-
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getOutputStream(osd);
-      }, mock(Config.class));
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
-    assertEquals(streamId, outputStreamImpl.getStreamId());
-    assertEquals(osd, streamAppDesc.getOutputDescriptors().get(streamId));
-    assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
-    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetOutputStreamWithNullSerde() {
-    String streamId = "test-stream-1";
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null);
-    new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getOutputStream(osd);
-      }, mock(Config.class));
-  }
-
-  @Test
-  public void testGetOutputStreamWithValueSerde() {
-    String streamId = "test-stream-1";
-    Serde mockValueSerde = mock(Serde.class);
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockValueSerde);
-
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getOutputStream(osd);
-      }, mock(Config.class));
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
-    assertEquals(streamId, outputStreamImpl.getStreamId());
-    assertEquals(osd, streamAppDesc.getOutputDescriptors().get(streamId));
-    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSystemDescriptorAfterGettingInputStream() {
-    String streamId = "test-stream-1";
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
-
-    new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(isd);
-        appDesc.withDefaultSystem(sd); // should throw exception
-      }, mock(Config.class));
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSystemDescriptorAfterGettingOutputStream() {
-    String streamId = "test-stream-1";
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mock(Serde.class));
-    new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getOutputStream(osd);
-        appDesc.withDefaultSystem(sd); // should throw exception
-      }, mock(Config.class));
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSystemDescriptorAfterGettingIntermediateStream() {
-    String streamId = "test-stream-1";
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
-    streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
-    streamAppDesc.withDefaultSystem(mock(SystemDescriptor.class)); // should throw exception
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetSameOutputStreamTwice() {
-    String streamId = "test-stream-1";
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    GenericOutputDescriptor osd1 = sd.getOutputDescriptor(streamId, mock(Serde.class));
-    GenericOutputDescriptor osd2 = sd.getOutputDescriptor(streamId, mock(Serde.class));
-    new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getOutputStream(osd1);
-        appDesc.getOutputStream(osd2); // should throw exception
-      }, mock(Config.class));
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithValueSerde() {
-    String streamId = "stream-1";
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
-
-    Serde mockValueSerde = mock(Serde.class);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        streamAppDesc.getIntermediateStream(streamId, mockValueSerde, false);
-
-    assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
-    assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
-    assertEquals(streamId, intermediateStreamImpl.getStreamId());
-    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertTrue(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithKeyValueSerde() {
-    String streamId = "streamId";
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
-
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        streamAppDesc.getIntermediateStream(streamId, mockKVSerde, false);
-
-    assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
-    assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
-    assertEquals(streamId, intermediateStreamImpl.getStreamId());
-    assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
-    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertEquals(mockKeySerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
-    assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithDefaultSystemDescriptor() {
-    Config mockConfig = mock(Config.class);
-    String streamId = "streamId";
-
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mock-system", "mock-system-factory");
-    streamAppDesc.withDefaultSystem(sd);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
-
-    assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
-    assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
-    assertEquals(streamId, intermediateStreamImpl.getStreamId());
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testGetIntermediateStreamWithNoSerde() {
-    Config mockConfig = mock(Config.class);
-    String streamId = "streamId";
-
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        streamAppDesc.getIntermediateStream(streamId, null, false); // should throw
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetSameIntermediateStreamTwice() {
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
-    streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
-    // should throw exception
-    streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
-  }
-
-  @Test
-  public void testGetNextOpIdIncrementsId() {
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
-    assertEquals("jobName-1234-merge-0", streamAppDesc.getNextOpId(OpCode.MERGE, null));
-    assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
-    assertEquals("jobName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP, null));
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testGetNextOpIdRejectsDuplicates() {
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
-    assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
-    streamAppDesc.getNextOpId(OpCode.JOIN, "customName"); // should throw
-  }
-
-  @Test
-  public void testOpIdValidation() {
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
-
-    // null and empty userDefinedIDs should fall back to autogenerated IDs.
-    try {
-      streamAppDesc.getNextOpId(OpCode.FILTER, null);
-      streamAppDesc.getNextOpId(OpCode.FILTER, "");
-      streamAppDesc.getNextOpId(OpCode.FILTER, " ");
-      streamAppDesc.getNextOpId(OpCode.FILTER, "\t");
-    } catch (SamzaException e) {
-      fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID.");
-    }
-
-    List<String> validOpIds = ImmutableList.of("op_id", "op-id", "1000", "op_1", "OP_ID");
-    for (String validOpId: validOpIds) {
-      try {
-        streamAppDesc.getNextOpId(OpCode.FILTER, validOpId);
-      } catch (Exception e) {
-        fail("Received an exception with a valid operator ID: " + validOpId);
-      }
-    }
-
-    List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
-    for (String invalidOpId: invalidOpIds) {
-      try {
-        streamAppDesc.getNextOpId(OpCode.FILTER, invalidOpId);
-        fail("Did not receive an exception with an invalid operator ID: " + invalidOpId);
-      } catch (SamzaException e) { }
-    }
-  }
-
-  @Test
-  public void testGetInputStreamPreservesInsertionOrder() {
-    Config mockConfig = mock(Config.class);
-
-    String testStreamId1 = "test-stream-1";
-    String testStreamId2 = "test-stream-2";
-    String testStreamId3 = "test-stream-3";
-
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getInputStream(sd.getInputDescriptor(testStreamId1, mock(Serde.class)));
-        appDesc.getInputStream(sd.getInputDescriptor(testStreamId2, mock(Serde.class)));
-        appDesc.getInputStream(sd.getInputDescriptor(testStreamId3, mock(Serde.class)));
-      }, mockConfig);
-
-    List<InputOperatorSpec> inputSpecs = new ArrayList<>(streamAppDesc.getInputOperators().values());
-    assertEquals(inputSpecs.size(), 3);
-    assertEquals(inputSpecs.get(0).getStreamId(), testStreamId1);
-    assertEquals(inputSpecs.get(1).getStreamId(), testStreamId2);
-    assertEquals(inputSpecs.get(2).getStreamId(), testStreamId3);
-  }
-
-  @Test
-  public void testGetTable() throws Exception {
-    Config mockConfig = mock(Config.class);
-
-    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
-    TableSpec testTableSpec = new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>());
-    when(mockTableDescriptor.getTableSpec()).thenReturn(testTableSpec);
-    when(mockTableDescriptor.getTableId()).thenReturn(testTableSpec.getId());
-    when(mockTableDescriptor.getSerde()).thenReturn(testTableSpec.getSerde());
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getTable(mockTableDescriptor);
-      }, mockConfig);
-    assertNotNull(streamAppDesc.getTables().get(testTableSpec.getId()));
-  }
-
-  @Test
-  public void testApplicationContainerContextFactory() {
-    ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
-    StreamApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
-    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
-    assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
-  }
-
-  @Test
-  public void testNoApplicationContainerContextFactory() {
-    StreamApplication testApp = appDesc -> {
-    };
-    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
-    assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
-  }
-
-  @Test
-  public void testApplicationTaskContextFactory() {
-    ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
-    StreamApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
-    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
-    assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
-  }
-
-  @Test
-  public void testNoApplicationTaskContextFactory() {
-    StreamApplication testApp = appDesc -> {
-    };
-    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
-    assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
-  }
-
-  @Test
-  public void testProcessorLifecycleListenerFactory() {
-    ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
-    StreamApplication testApp = appSpec -> appSpec.withProcessorLifecycleListenerFactory(mockFactory);
-    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
-    assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetTableWithBadId() {
-    Config mockConfig = mock(Config.class);
-    new StreamApplicationDescriptorImpl(appDesc -> {
-        BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
-        when(mockTableDescriptor.getTableId()).thenReturn("my.table");
-        appDesc.getTable(mockTableDescriptor);
-      }, mockConfig);
-  }
-
-  class MockExpandingSystemDescriptor extends SystemDescriptor<MockExpandingSystemDescriptor> implements ExpandingInputDescriptorProvider<Integer> {
-    public MockExpandingSystemDescriptor(String systemName, StreamExpander expander) {
-      super(systemName, "factory.class", null, expander);
-    }
-
-    @Override
-    public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
-      return new MockInputDescriptor<>(streamId, this, serde);
-    }
-  }
-
-  class MockTransformingSystemDescriptor extends SystemDescriptor<MockTransformingSystemDescriptor> implements TransformingInputDescriptorProvider<Integer> {
-    public MockTransformingSystemDescriptor(String systemName, InputTransformer transformer) {
-      super(systemName, "factory.class", transformer, null);
-    }
-
-    @Override
-    public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
-      return new MockInputDescriptor<>(streamId, this, serde);
-    }
-  }
-
-  public class MockInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, MockInputDescriptor<StreamMessageType>> {
-    MockInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
-      super(streamId, serde, systemDescriptor, null);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
deleted file mode 100644
index e79e25b..0000000
--- a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.ApplicationContainerContextFactory;
-import org.apache.samza.context.ApplicationTaskContextFactory;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.task.TaskFactory;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Unit test for {@link TaskApplicationDescriptorImpl}
- */
-public class TestTaskApplicationDescriptorImpl {
-
-  private Config config = mock(Config.class);
-  private String defaultSystemName = "test-system";
-  private SystemDescriptor defaultSystemDescriptor = mock(SystemDescriptor.class);
-  private List<InputDescriptor> mockInputs = new ArrayList<InputDescriptor>() { {
-      InputDescriptor mock1 = mock(InputDescriptor.class);
-      InputDescriptor mock2 = mock(InputDescriptor.class);
-      when(mock1.getStreamId()).thenReturn("test-input1");
-      when(mock2.getStreamId()).thenReturn("test-input2");
-      this.add(mock1);
-      this.add(mock2);
-    } };
-  private List<OutputDescriptor> mockOutputs = new ArrayList<OutputDescriptor>() { {
-      OutputDescriptor mock1 = mock(OutputDescriptor.class);
-      OutputDescriptor mock2 = mock(OutputDescriptor.class);
-      when(mock1.getStreamId()).thenReturn("test-output1");
-      when(mock2.getStreamId()).thenReturn("test-output2");
-      this.add(mock1);
-      this.add(mock2);
-    } };
-  private Set<TableDescriptor> mockTables = new HashSet<TableDescriptor>() { {
-      BaseTableDescriptor mock1 = mock(BaseTableDescriptor.class);
-      BaseTableDescriptor mock2 = mock(BaseTableDescriptor.class);
-      when(mock1.getTableId()).thenReturn("test-table1");
-      when(mock2.getTableId()).thenReturn("test-table2");
-      when(mock1.getSerde()).thenReturn(mock(KVSerde.class));
-      when(mock2.getSerde()).thenReturn(mock(KVSerde.class));
-      this.add(mock1);
-      this.add(mock2);
-    } };
-
-  @Before
-  public void setUp() {
-    when(defaultSystemDescriptor.getSystemName()).thenReturn(defaultSystemName);
-    mockInputs.forEach(isd -> when(isd.getSystemDescriptor()).thenReturn(defaultSystemDescriptor));
-    mockOutputs.forEach(osd -> when(osd.getSystemDescriptor()).thenReturn(defaultSystemDescriptor));
-  }
-
-  @Test
-  public void testConstructor() {
-    TaskApplication mockApp = mock(TaskApplication.class);
-    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(mockApp, config);
-    verify(mockApp).describe(appDesc);
-    assertEquals(config, appDesc.config);
-  }
-
-  @Test
-  public void testAddInputStreams() {
-    TaskApplication testApp = appDesc -> {
-      mockInputs.forEach(appDesc::addInputStream);
-    };
-    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
-    assertEquals(mockInputs.toArray(), appDesc.getInputDescriptors().values().toArray());
-  }
-
-  @Test
-  public void testAddOutputStreams() {
-    TaskApplication testApp = appDesc -> {
-      mockOutputs.forEach(appDesc::addOutputStream);
-    };
-    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
-    assertEquals(mockOutputs.toArray(), appDesc.getOutputDescriptors().values().toArray());
-  }
-
-  @Test
-  public void testAddTables() {
-    TaskApplication testApp = appDesc -> {
-      mockTables.forEach(appDesc::addTable);
-    };
-    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
-    assertEquals(mockTables, appDesc.getTableDescriptors());
-  }
-
-  @Test
-  public void testSetTaskFactory() {
-    TaskFactory mockTf = mock(TaskFactory.class);
-    TaskApplication testApp = appDesc -> appDesc.setTaskFactory(mockTf);
-    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
-    assertEquals(appDesc.getTaskFactory(), mockTf);
-  }
-
-  @Test
-  public void testApplicationContainerContextFactory() {
-    ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
-    TaskApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
-    TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
-    assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
-  }
-
-  @Test
-  public void testNoApplicationContainerContextFactory() {
-    TaskApplication testApp = appDesc -> {
-    };
-    TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
-    assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
-  }
-
-  @Test
-  public void testApplicationTaskContextFactory() {
-    ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
-    TaskApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
-    TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
-    assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
-  }
-
-  @Test
-  public void testNoApplicationTaskContextFactory() {
-    TaskApplication testApp = appDesc -> {
-    };
-    TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
-    assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
-  }
-
-  @Test
-  public void testProcessorLifecycleListener() {
-    ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
-    TaskApplication testApp = appDesc -> {
-      appDesc.withProcessorLifecycleListenerFactory(mockFactory);
-    };
-    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
-    assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
-  }
-}
\ No newline at end of file