You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:46 UTC
[06/12] samza git commit: Consolidating package names for System,
Stream, Application and Table descriptors.
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
new file mode 100644
index 0000000..0187b2e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.util.EmbeddedTaggedRateLimiter;
+import org.apache.samza.util.RateLimiter;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Table descriptor for remote store backed tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, RemoteTableDescriptor<K, V>> {
+ /**
+ * Tag to be used for provision credits for rate limiting read operations from the remote table.
+ * Caller must pre-populate the credits with this tag when specifying a custom rate limiter instance
+ * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
+ * TableRateLimiter.CreditFunction)}
+ */
+ public static final String RL_READ_TAG = "readTag";
+
+ /**
+ * Tag to be used for provision credits for rate limiting write operations into the remote table.
+ * Caller can optionally populate the credits with this tag when specifying a custom rate limiter instance
+ * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
+ * TableRateLimiter.CreditFunction)} and it needs the write functionality.
+ */
+ public static final String RL_WRITE_TAG = "writeTag";
+
+ // Input support for a specific remote store (required)
+ private TableReadFunction<K, V> readFn;
+
+ // Output support for a specific remote store (optional)
+ private TableWriteFunction<K, V> writeFn;
+
+ // Rate limiter for client-side throttling;
+ // can either be constructed indirectly from rates or overridden by withRateLimiter()
+ private RateLimiter rateLimiter;
+
+ // Rates for constructing the default rate limiter when they are non-zero
+ private Map<String, Integer> tagCreditsMap = new HashMap<>();
+
+ private TableRateLimiter.CreditFunction<K, V> readCreditFn;
+ private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
+
+ private TableRetryPolicy readRetryPolicy;
+ private TableRetryPolicy writeRetryPolicy;
+
+ // By default execute future callbacks on the native client threads
+ // ie. no additional thread pool for callbacks.
+ private int asyncCallbackPoolSize = -1;
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ */
+ public RemoteTableDescriptor(String tableId) {
+ super(tableId);
+ }
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ * @param serde the serde for key and value
+ */
+ public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ super(tableId, serde);
+ }
+
+ @Override
+ public TableSpec getTableSpec() {
+ validate();
+
+ Map<String, String> tableSpecConfig = new HashMap<>();
+ generateTableSpecConfig(tableSpecConfig);
+
+ // Serialize and store reader/writer functions
+ tableSpecConfig.put(RemoteTableProvider.READ_FN, SerdeUtils.serialize("read function", readFn));
+
+ if (writeFn != null) {
+ tableSpecConfig.put(RemoteTableProvider.WRITE_FN, SerdeUtils.serialize("write function", writeFn));
+ }
+
+ // Serialize the rate limiter if specified
+ if (!tagCreditsMap.isEmpty()) {
+ rateLimiter = new EmbeddedTaggedRateLimiter(tagCreditsMap);
+ }
+
+ if (rateLimiter != null) {
+ tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
+ }
+
+ // Serialize the readCredit and writeCredit functions
+ if (readCreditFn != null) {
+ tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, SerdeUtils.serialize(
+ "read credit function", readCreditFn));
+ }
+
+ if (writeCreditFn != null) {
+ tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, SerdeUtils.serialize(
+ "write credit function", writeCreditFn));
+ }
+
+ if (readRetryPolicy != null) {
+ tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize(
+ "read retry policy", readRetryPolicy));
+ }
+
+ if (writeRetryPolicy != null) {
+ tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize(
+ "write retry policy", writeRetryPolicy));
+ }
+
+ tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
+
+ return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig);
+ }
+
+ /**
+ * Use specified TableReadFunction with remote table and a retry policy.
+ * @param readFn read function instance
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn) {
+ Preconditions.checkNotNull(readFn, "null read function");
+ this.readFn = readFn;
+ return this;
+ }
+
+ /**
+ * Use specified TableWriteFunction with remote table and a retry policy.
+ * @param writeFn write function instance
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn) {
+ Preconditions.checkNotNull(writeFn, "null write function");
+ this.writeFn = writeFn;
+ return this;
+ }
+
+ /**
+ * Use specified TableReadFunction with remote table.
+ * @param readFn read function instance
+ * @param retryPolicy retry policy for the read function
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
+ Preconditions.checkNotNull(readFn, "null read function");
+ Preconditions.checkNotNull(retryPolicy, "null retry policy");
+ this.readFn = readFn;
+ this.readRetryPolicy = retryPolicy;
+ return this;
+ }
+
+ /**
+ * Use specified TableWriteFunction with remote table.
+ * @param writeFn write function instance
+ * @param retryPolicy retry policy for the write function
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
+ Preconditions.checkNotNull(writeFn, "null write function");
+ Preconditions.checkNotNull(retryPolicy, "null retry policy");
+ this.writeFn = writeFn;
+ this.writeRetryPolicy = retryPolicy;
+ return this;
+ }
+
+ /**
+ * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
+ * of credits to be charged from the rate limiter for table read and write operations.
+ * This is an advanced API that provides greater flexibility to throttle each record in the table
+ * with different number of credits. For most common use-cases eg: limit the number of read/write
+ * operations, please instead use the {@link RemoteTableDescriptor#withReadRateLimit(int)} and
+ * {@link RemoteTableDescriptor#withWriteRateLimit(int)}.
+ *
+ * @param rateLimiter rate limiter instance to be used for throttling
+ * @param readCreditFn credit function for rate limiting read operations
+ * @param writeCreditFn credit function for rate limiting write operations
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withRateLimiter(RateLimiter rateLimiter,
+ TableRateLimiter.CreditFunction<K, V> readCreditFn,
+ TableRateLimiter.CreditFunction<K, V> writeCreditFn) {
+ Preconditions.checkNotNull(rateLimiter, "null read rate limiter");
+ this.rateLimiter = rateLimiter;
+ this.readCreditFn = readCreditFn;
+ this.writeCreditFn = writeCreditFn;
+ return this;
+ }
+
+ /**
+ * Specify the rate limit for table read operations. If the read rate limit is set with this method
+ * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
+ * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
+ * and vice versa.
+ * @param creditsPerSec rate limit for read operations; must be positive
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withReadRateLimit(int creditsPerSec) {
+ Preconditions.checkArgument(creditsPerSec > 0, "Max read rate must be a positive number.");
+ tagCreditsMap.put(RL_READ_TAG, creditsPerSec);
+ return this;
+ }
+
+ /**
+ * Specify the rate limit for table write operations. If the write rate limit is set with this method
+ * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
+ * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
+ * and vice versa.
+ * @param creditsPerSec rate limit for write operations; must be positive
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withWriteRateLimit(int creditsPerSec) {
+ Preconditions.checkArgument(creditsPerSec > 0, "Max write rate must be a positive number.");
+ tagCreditsMap.put(RL_WRITE_TAG, creditsPerSec);
+ return this;
+ }
+
+ /**
+ * Specify the size of the thread pool for the executor used to execute
+ * callbacks of CompletableFutures of async Table operations. By default, these
+ * futures are completed (called) by the threads of the native store client. Depending
+ * on the implementation of the native client, it may or may not allow executing long
+ * running operations in the callbacks. This config can be used to execute the callbacks
+ * from a separate executor to decouple from the native client. If configured, this
+ * thread pool is shared by all read and write operations.
+ * @param poolSize max number of threads in the executor for async callbacks
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withAsyncCallbackExecutorPoolSize(int poolSize) {
+ this.asyncCallbackPoolSize = poolSize;
+ return this;
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+ Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
+ Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
+ "Only one of rateLimiter instance or read/write limits can be specified");
+ // Assume callback executor pool should have no more than 20 threads
+ Preconditions.checkArgument(asyncCallbackPoolSize <= 20,
+ "too many threads for async callback executor.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProvider.java
new file mode 100644
index 0000000..fdccc70
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProvider.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote.descriptors;
+
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteReadableTable;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+import org.apache.samza.table.utils.descriptors.BaseTableProvider;
+import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.table.utils.TableMetricsUtil;
+import org.apache.samza.util.RateLimiter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+
+/**
+ * Provide for remote table instances
+ */
+public class RemoteTableProvider extends BaseTableProvider {
+
+ static final String READ_FN = "io.read.func";
+ static final String WRITE_FN = "io.write.func";
+ static final String RATE_LIMITER = "io.ratelimiter";
+ static final String READ_CREDIT_FN = "io.read.credit.func";
+ static final String WRITE_CREDIT_FN = "io.write.credit.func";
+ static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
+ static final String READ_RETRY_POLICY = "io.read.retry.policy";
+ static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
+
+ private final boolean readOnly;
+ private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
+
+ /**
+ * Map of tableId -> executor service for async table IO and callbacks. The same executors
+ * are shared by both read/write operations such that tables of the same tableId all share
+ * the set same of executors globally whereas table itself is per-task.
+ */
+ private static Map<String, ExecutorService> tableExecutors = new ConcurrentHashMap<>();
+ private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
+ private static ScheduledExecutorService retryExecutor;
+
+ public RemoteTableProvider(TableSpec tableSpec) {
+ super(tableSpec);
+ this.readOnly = !tableSpec.getConfig().containsKey(WRITE_FN);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Table getTable() {
+ RemoteReadableTable table;
+ String tableId = tableSpec.getId();
+
+ TableReadFunction readFn = getReadFn();
+ RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
+ if (rateLimiter != null) {
+ rateLimiter.init(this.context);
+ }
+ TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(READ_CREDIT_FN);
+ TableRateLimiter readRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
+
+ TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
+ TableRateLimiter writeRateLimiter = null;
+
+ TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY);
+ TableRetryPolicy writeRetryPolicy = null;
+
+ if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
+ retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-retry-executor");
+ thread.setDaemon(true);
+ return thread;
+ });
+ }
+
+ if (readRetryPolicy != null) {
+ readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
+ }
+
+ TableWriteFunction writeFn = getWriteFn();
+
+ boolean isRateLimited = readRateLimiter.isRateLimited();
+ if (!readOnly) {
+ writeCreditFn = deserializeObject(WRITE_CREDIT_FN);
+ writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
+ isRateLimited |= writeRateLimiter.isRateLimited();
+ writeRetryPolicy = deserializeObject(WRITE_RETRY_POLICY);
+ if (writeRetryPolicy != null) {
+ writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
+ }
+ }
+
+ // Optional executor for future callback/completion. Shared by both read and write operations.
+ int callbackPoolSize = Integer.parseInt(tableSpec.getConfig().get(ASYNC_CALLBACK_POOL_SIZE));
+ if (callbackPoolSize > 0) {
+ callbackExecutors.computeIfAbsent(tableId, (arg) ->
+ Executors.newFixedThreadPool(callbackPoolSize, (runnable) -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-" + tableId + "-async-callback-pool");
+ thread.setDaemon(true);
+ return thread;
+ }));
+ }
+
+ if (isRateLimited) {
+ tableExecutors.computeIfAbsent(tableId, (arg) ->
+ Executors.newSingleThreadExecutor(runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-" + tableId + "-async-executor");
+ thread.setDaemon(true);
+ return thread;
+ }));
+ }
+
+ if (readOnly) {
+ table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter,
+ tableExecutors.get(tableId), callbackExecutors.get(tableId));
+ } else {
+ table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter,
+ writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
+ }
+
+ TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
+ if (readRetryPolicy != null) {
+ ((RetriableReadFunction) readFn).setMetrics(metricsUtil);
+ }
+ if (writeRetryPolicy != null) {
+ ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
+ }
+
+ table.init(this.context);
+ tables.add(table);
+ return table;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ tables.forEach(t -> t.close());
+ tableExecutors.values().forEach(e -> e.shutdown());
+ callbackExecutors.values().forEach(e -> e.shutdown());
+ }
+
+ private <T> T deserializeObject(String key) {
+ String entry = tableSpec.getConfig().getOrDefault(key, "");
+ if (entry.isEmpty()) {
+ return null;
+ }
+ return SerdeUtils.deserialize(key, entry);
+ }
+
+ private TableReadFunction<?, ?> getReadFn() {
+ TableReadFunction<?, ?> readFn = deserializeObject(READ_FN);
+ if (readFn != null) {
+ readFn.init(this.context);
+ }
+ return readFn;
+ }
+
+ private TableWriteFunction<?, ?> getWriteFn() {
+ TableWriteFunction<?, ?> writeFn = deserializeObject(WRITE_FN);
+ if (writeFn != null) {
+ writeFn.init(this.context);
+ }
+ return writeFn;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProviderFactory.java
new file mode 100644
index 0000000..4802265
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableProviderFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote.descriptors;
+
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.descriptors.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory class for a remote table provider
+ */
+public class RemoteTableProviderFactory implements TableProviderFactory {
+ @Override
+ public TableProvider getTableProvider(TableSpec tableSpec) {
+ Preconditions.checkNotNull(tableSpec, "null table spec");
+ return new RemoteTableProvider(tableSpec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
index 1adddc0..8f7aa7e 100644
--- a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
@@ -39,7 +39,7 @@ import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
/**
* Wrapper for a {@link TableReadFunction} instance to add common retry
* support with a {@link TableRetryPolicy}. This wrapper is created by
- * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry
+ * {@link org.apache.samza.table.remote.descriptors.RemoteTableProvider} when a retry
* policy is specified together with the {@link TableReadFunction}.
*
* Actual retry mechanism is provided by the failsafe library. Retry is
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
index 2f3f062..ee7959a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
@@ -39,7 +39,7 @@ import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
/**
* Wrapper for a {@link TableWriteFunction} instance to add common retry
* support with a {@link TableRetryPolicy}. This wrapper is created by
- * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry
+ * {@link org.apache.samza.table.remote.descriptors.RemoteTableProvider} when a retry
* policy is specified together with the {@link TableWriteFunction}.
*
* Actual retry mechanism is provided by the failsafe library. Retry is
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
deleted file mode 100644
index dfbd835..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Base class for all table provider implementations.
- */
-abstract public class BaseTableProvider implements TableProvider {
-
- final protected Logger logger = LoggerFactory.getLogger(getClass());
-
- final protected TableSpec tableSpec;
-
- protected Context context;
-
- public BaseTableProvider(TableSpec tableSpec) {
- this.tableSpec = tableSpec;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void init(Context context) {
- this.context = context;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
- Map<String, String> tableConfig = new HashMap<>();
-
- // Insert table_id prefix to config entries
- tableSpec.getConfig().forEach((k, v) -> {
- String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
- tableConfig.put(realKey, v);
- });
-
- logger.info("Generated configuration for table " + tableSpec.getId());
-
- return tableConfig;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/utils/descriptors/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/descriptors/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/utils/descriptors/BaseTableProvider.java
new file mode 100644
index 0000000..6f787d1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/descriptors/BaseTableProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for all table provider implementations.
+ */
+abstract public class BaseTableProvider implements TableProvider {
+
+ final protected Logger logger = LoggerFactory.getLogger(getClass());
+
+ final protected TableSpec tableSpec;
+
+ protected Context context;
+
+ public BaseTableProvider(TableSpec tableSpec) {
+ this.tableSpec = tableSpec;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void init(Context context) {
+ this.context = context;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
+ Map<String, String> tableConfig = new HashMap<>();
+
+ // Insert table_id prefix to config entries
+ tableSpec.getConfig().forEach((k, v) -> {
+ String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
+ tableConfig.put(realKey, v);
+ });
+
+ logger.info("Generated configuration for table " + tableSpec.getId());
+
+ return tableConfig;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 218ba5d..87131d7 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -64,11 +64,12 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
* Initializes this task during startup.
* <p>
* Implementation: Initializes the runtime {@link OperatorImplGraph} according to user-defined {@link OperatorSpecGraph}.
- * Users set the input and output streams and the task-wide context manager using {@link org.apache.samza.application.StreamApplicationDescriptor} APIs,
- * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs. After the
- * {@link org.apache.samza.application.StreamApplicationDescriptorImpl} is initialized once by the application, it then creates
- * an immutable {@link OperatorSpecGraph} accordingly, which is passed in to this class to create the {@link OperatorImplGraph}
- * corresponding to the logical DAG.
+ * Users set the input and output streams and the task-wide context manager using
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor} APIs, and the logical transforms
+ * using the {@link org.apache.samza.operators.MessageStream} APIs. After the
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl} is initialized once by the
+ * application, it then creates an immutable {@link OperatorSpecGraph} accordingly, which is passed in to this
+ * class to create the {@link OperatorImplGraph} corresponding to the logical DAG.
*
* @param context allows initializing and accessing contextual data of this StreamTask
* @throws Exception in case of initialization errors
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
index c312fac..b2297e1 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
@@ -21,10 +21,10 @@ package org.apache.samza.task;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.application.TaskApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
import org.apache.samza.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 929d6a4..ee6aff3 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,7 +19,8 @@
package org.apache.samza.job.local
-import org.apache.samza.application.{ApplicationDescriptorUtil, ApplicationUtil}
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
import org.apache.samza.config.JobConfig._
import org.apache.samza.config.ShellCommandConfig._
import org.apache.samza.config.{Config, TaskConfigJava}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java b/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
index ccd88b8..8b96c8a 100644
--- a/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
+++ b/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
@@ -18,6 +18,8 @@
*/
package org.apache.samza.application;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+
/**
* Test class of {@link StreamApplication} for unit tests
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
index 9b590c4..ab91cee 100644
--- a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
@@ -25,6 +25,8 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
import org.apache.samza.task.MockStreamTask;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
deleted file mode 100644
index de16ef2..0000000
--- a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
+++ /dev/null
@@ -1,601 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.context.ApplicationContainerContextFactory;
-import org.apache.samza.context.ApplicationTaskContextFactory;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
-import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.StreamExpander;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Unit test for {@link StreamApplicationDescriptorImpl}
- */
-public class TestStreamApplicationDescriptorImpl {
-
- @Test
- public void testConstructor() {
- StreamApplication mockApp = mock(StreamApplication.class);
- Config mockConfig = mock(Config.class);
- StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(mockApp, mockConfig);
- verify(mockApp).describe(appDesc);
- assertEquals(mockConfig, appDesc.config);
- }
-
- @Test
- public void testGetInputStreamWithValueSerde() {
-
- String streamId = "test-stream-1";
- Serde mockValueSerde = mock(Serde.class);
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(isd);
- }, mock(Config.class));
-
- InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(streamId, inputOpSpec.getStreamId());
- assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
- assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
- }
-
- @Test
- public void testGetInputStreamWithKeyValueSerde() {
-
- String streamId = "test-stream-1";
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde);
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(isd);
- }, mock(Config.class));
-
- InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(streamId, inputOpSpec.getStreamId());
- assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
- assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
- assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testGetInputStreamWithNullSerde() {
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null);
- new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(isd);
- }, mock(Config.class));
- }
-
- @Test
- public void testGetInputStreamWithTransformFunction() {
- String streamId = "test-stream-1";
- Serde mockValueSerde = mock(Serde.class);
- InputTransformer transformer = ime -> ime;
- MockTransformingSystemDescriptor sd = new MockTransformingSystemDescriptor("mockSystem", transformer);
- MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(isd);
- }, mock(Config.class));
-
- InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(streamId, inputOpSpec.getStreamId());
- assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
- assertEquals(transformer, inputOpSpec.getTransformer());
- }
-
- @Test
- public void testGetInputStreamWithExpandingSystem() {
- String streamId = "test-stream-1";
- String expandedStreamId = "expanded-stream";
- AtomicInteger expandCallCount = new AtomicInteger();
- StreamExpander expander = (sg, isd) -> {
- expandCallCount.incrementAndGet();
- InputDescriptor expandedISD =
- new GenericSystemDescriptor("expanded-system", "mockFactoryClass")
- .getInputDescriptor(expandedStreamId, new IntegerSerde());
-
- return sg.getInputStream(expandedISD);
- };
- MockExpandingSystemDescriptor sd = new MockExpandingSystemDescriptor("mock-system", expander);
- MockInputDescriptor isd = sd.getInputDescriptor(streamId, new IntegerSerde());
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(isd);
- }, mock(Config.class));
-
- InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(expandedStreamId);
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(1, expandCallCount.get());
- assertFalse(streamAppDesc.getInputOperators().containsKey(streamId));
- assertFalse(streamAppDesc.getInputDescriptors().containsKey(streamId));
- assertTrue(streamAppDesc.getInputDescriptors().containsKey(expandedStreamId));
- assertEquals(expandedStreamId, inputOpSpec.getStreamId());
- }
-
- @Test
- public void testGetInputStreamWithRelaxedTypes() {
- String streamId = "test-stream-1";
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(isd);
- }, mock(Config.class));
-
- InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(streamId, inputOpSpec.getStreamId());
- assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
- }
-
- @Test
- public void testMultipleGetInputStreams() {
- String streamId1 = "test-stream-1";
- String streamId2 = "test-stream-2";
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId1, mock(Serde.class));
- GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId2, mock(Serde.class));
-
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(isd1);
- appDesc.getInputStream(isd2);
- }, mock(Config.class));
-
- InputOperatorSpec inputOpSpec1 = streamAppDesc.getInputOperators().get(streamId1);
- InputOperatorSpec inputOpSpec2 = streamAppDesc.getInputOperators().get(streamId2);
-
- assertEquals(2, streamAppDesc.getInputOperators().size());
- assertEquals(streamId1, inputOpSpec1.getStreamId());
- assertEquals(streamId2, inputOpSpec2.getStreamId());
- assertEquals(2, streamAppDesc.getInputDescriptors().size());
- assertEquals(isd1, streamAppDesc.getInputDescriptors().get(streamId1));
- assertEquals(isd2, streamAppDesc.getInputDescriptors().get(streamId2));
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetSameInputStreamTwice() {
- String streamId = "test-stream-1";
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId, mock(Serde.class));
- GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId, mock(Serde.class));
- new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(isd1);
- // should throw exception
- appDesc.getInputStream(isd2);
- }, mock(Config.class));
- }
-
- @Test
- public void testMultipleSystemDescriptorForSameSystemName() {
- GenericSystemDescriptor sd1 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericSystemDescriptor sd2 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd1 = sd1.getInputDescriptor("test-stream-1", mock(Serde.class));
- GenericInputDescriptor isd2 = sd2.getInputDescriptor("test-stream-2", mock(Serde.class));
- GenericOutputDescriptor osd1 = sd2.getOutputDescriptor("test-stream-3", mock(Serde.class));
-
- new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(isd1);
- try {
- appDesc.getInputStream(isd2);
- fail("Adding input stream with the same system name but different SystemDescriptor should have failed");
- } catch (IllegalStateException e) { }
-
- try {
- appDesc.getOutputStream(osd1);
- fail("adding output stream with the same system name but different SystemDescriptor should have failed");
- } catch (IllegalStateException e) { }
- }, mock(Config.class));
-
- new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.withDefaultSystem(sd2);
- try {
- appDesc.getInputStream(isd1);
- fail("Adding input stream with the same system name as the default system but different SystemDescriptor should have failed");
- } catch (IllegalStateException e) { }
- }, mock(Config.class));
- }
-
- @Test
- public void testGetOutputStreamWithKeyValueSerde() {
- String streamId = "test-stream-1";
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockKVSerde);
-
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getOutputStream(osd);
- }, mock(Config.class));
-
- OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
- assertEquals(streamId, outputStreamImpl.getStreamId());
- assertEquals(osd, streamAppDesc.getOutputDescriptors().get(streamId));
- assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
- assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testGetOutputStreamWithNullSerde() {
- String streamId = "test-stream-1";
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null);
- new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getOutputStream(osd);
- }, mock(Config.class));
- }
-
- @Test
- public void testGetOutputStreamWithValueSerde() {
- String streamId = "test-stream-1";
- Serde mockValueSerde = mock(Serde.class);
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockValueSerde);
-
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getOutputStream(osd);
- }, mock(Config.class));
-
- OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
- assertEquals(streamId, outputStreamImpl.getStreamId());
- assertEquals(osd, streamAppDesc.getOutputDescriptors().get(streamId));
- assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSetDefaultSystemDescriptorAfterGettingInputStream() {
- String streamId = "test-stream-1";
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
-
- new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(isd);
- appDesc.withDefaultSystem(sd); // should throw exception
- }, mock(Config.class));
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSetDefaultSystemDescriptorAfterGettingOutputStream() {
- String streamId = "test-stream-1";
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mock(Serde.class));
- new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getOutputStream(osd);
- appDesc.withDefaultSystem(sd); // should throw exception
- }, mock(Config.class));
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSetDefaultSystemDescriptorAfterGettingIntermediateStream() {
- String streamId = "test-stream-1";
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
- streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
- streamAppDesc.withDefaultSystem(mock(SystemDescriptor.class)); // should throw exception
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetSameOutputStreamTwice() {
- String streamId = "test-stream-1";
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericOutputDescriptor osd1 = sd.getOutputDescriptor(streamId, mock(Serde.class));
- GenericOutputDescriptor osd2 = sd.getOutputDescriptor(streamId, mock(Serde.class));
- new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getOutputStream(osd1);
- appDesc.getOutputStream(osd2); // should throw exception
- }, mock(Config.class));
- }
-
- @Test
- public void testGetIntermediateStreamWithValueSerde() {
- String streamId = "stream-1";
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
-
- Serde mockValueSerde = mock(Serde.class);
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- streamAppDesc.getIntermediateStream(streamId, mockValueSerde, false);
-
- assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
- assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
- assertEquals(streamId, intermediateStreamImpl.getStreamId());
- assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
- assertTrue(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
- }
-
- @Test
- public void testGetIntermediateStreamWithKeyValueSerde() {
- String streamId = "streamId";
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
-
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- streamAppDesc.getIntermediateStream(streamId, mockKVSerde, false);
-
- assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
- assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
- assertEquals(streamId, intermediateStreamImpl.getStreamId());
- assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
- assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
- assertEquals(mockKeySerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
- assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
- }
-
- @Test
- public void testGetIntermediateStreamWithDefaultSystemDescriptor() {
- Config mockConfig = mock(Config.class);
- String streamId = "streamId";
-
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mock-system", "mock-system-factory");
- streamAppDesc.withDefaultSystem(sd);
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
-
- assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
- assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
- assertEquals(streamId, intermediateStreamImpl.getStreamId());
- }
-
- @Test(expected = NullPointerException.class)
- public void testGetIntermediateStreamWithNoSerde() {
- Config mockConfig = mock(Config.class);
- String streamId = "streamId";
-
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- streamAppDesc.getIntermediateStream(streamId, null, false); // should throw
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetSameIntermediateStreamTwice() {
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
- streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
- // should throw exception
- streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
- }
-
- @Test
- public void testGetNextOpIdIncrementsId() {
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
- assertEquals("jobName-1234-merge-0", streamAppDesc.getNextOpId(OpCode.MERGE, null));
- assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
- assertEquals("jobName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP, null));
- }
-
- @Test(expected = SamzaException.class)
- public void testGetNextOpIdRejectsDuplicates() {
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
- assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
- streamAppDesc.getNextOpId(OpCode.JOIN, "customName"); // should throw
- }
-
- @Test
- public void testOpIdValidation() {
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
-
- // null and empty userDefinedIDs should fall back to autogenerated IDs.
- try {
- streamAppDesc.getNextOpId(OpCode.FILTER, null);
- streamAppDesc.getNextOpId(OpCode.FILTER, "");
- streamAppDesc.getNextOpId(OpCode.FILTER, " ");
- streamAppDesc.getNextOpId(OpCode.FILTER, "\t");
- } catch (SamzaException e) {
- fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID.");
- }
-
- List<String> validOpIds = ImmutableList.of("op_id", "op-id", "1000", "op_1", "OP_ID");
- for (String validOpId: validOpIds) {
- try {
- streamAppDesc.getNextOpId(OpCode.FILTER, validOpId);
- } catch (Exception e) {
- fail("Received an exception with a valid operator ID: " + validOpId);
- }
- }
-
- List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
- for (String invalidOpId: invalidOpIds) {
- try {
- streamAppDesc.getNextOpId(OpCode.FILTER, invalidOpId);
- fail("Did not receive an exception with an invalid operator ID: " + invalidOpId);
- } catch (SamzaException e) { }
- }
- }
-
- @Test
- public void testGetInputStreamPreservesInsertionOrder() {
- Config mockConfig = mock(Config.class);
-
- String testStreamId1 = "test-stream-1";
- String testStreamId2 = "test-stream-2";
- String testStreamId3 = "test-stream-3";
-
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getInputStream(sd.getInputDescriptor(testStreamId1, mock(Serde.class)));
- appDesc.getInputStream(sd.getInputDescriptor(testStreamId2, mock(Serde.class)));
- appDesc.getInputStream(sd.getInputDescriptor(testStreamId3, mock(Serde.class)));
- }, mockConfig);
-
- List<InputOperatorSpec> inputSpecs = new ArrayList<>(streamAppDesc.getInputOperators().values());
- assertEquals(inputSpecs.size(), 3);
- assertEquals(inputSpecs.get(0).getStreamId(), testStreamId1);
- assertEquals(inputSpecs.get(1).getStreamId(), testStreamId2);
- assertEquals(inputSpecs.get(2).getStreamId(), testStreamId3);
- }
-
- @Test
- public void testGetTable() throws Exception {
- Config mockConfig = mock(Config.class);
-
- BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
- TableSpec testTableSpec = new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>());
- when(mockTableDescriptor.getTableSpec()).thenReturn(testTableSpec);
- when(mockTableDescriptor.getTableId()).thenReturn(testTableSpec.getId());
- when(mockTableDescriptor.getSerde()).thenReturn(testTableSpec.getSerde());
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getTable(mockTableDescriptor);
- }, mockConfig);
- assertNotNull(streamAppDesc.getTables().get(testTableSpec.getId()));
- }
-
- @Test
- public void testApplicationContainerContextFactory() {
- ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
- StreamApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
- StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
- assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
- }
-
- @Test
- public void testNoApplicationContainerContextFactory() {
- StreamApplication testApp = appDesc -> {
- };
- StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
- assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
- }
-
- @Test
- public void testApplicationTaskContextFactory() {
- ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
- StreamApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
- StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
- assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
- }
-
- @Test
- public void testNoApplicationTaskContextFactory() {
- StreamApplication testApp = appDesc -> {
- };
- StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
- assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
- }
-
- @Test
- public void testProcessorLifecycleListenerFactory() {
- ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
- StreamApplication testApp = appSpec -> appSpec.withProcessorLifecycleListenerFactory(mockFactory);
- StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
- assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetTableWithBadId() {
- Config mockConfig = mock(Config.class);
- new StreamApplicationDescriptorImpl(appDesc -> {
- BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
- when(mockTableDescriptor.getTableId()).thenReturn("my.table");
- appDesc.getTable(mockTableDescriptor);
- }, mockConfig);
- }
-
- class MockExpandingSystemDescriptor extends SystemDescriptor<MockExpandingSystemDescriptor> implements ExpandingInputDescriptorProvider<Integer> {
- public MockExpandingSystemDescriptor(String systemName, StreamExpander expander) {
- super(systemName, "factory.class", null, expander);
- }
-
- @Override
- public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
- return new MockInputDescriptor<>(streamId, this, serde);
- }
- }
-
- class MockTransformingSystemDescriptor extends SystemDescriptor<MockTransformingSystemDescriptor> implements TransformingInputDescriptorProvider<Integer> {
- public MockTransformingSystemDescriptor(String systemName, InputTransformer transformer) {
- super(systemName, "factory.class", transformer, null);
- }
-
- @Override
- public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
- return new MockInputDescriptor<>(streamId, this, serde);
- }
- }
-
- public class MockInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, MockInputDescriptor<StreamMessageType>> {
- MockInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
- super(streamId, serde, systemDescriptor, null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
deleted file mode 100644
index e79e25b..0000000
--- a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.ApplicationContainerContextFactory;
-import org.apache.samza.context.ApplicationTaskContextFactory;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.task.TaskFactory;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Unit test for {@link TaskApplicationDescriptorImpl}
- */
-public class TestTaskApplicationDescriptorImpl {
-
- private Config config = mock(Config.class);
- private String defaultSystemName = "test-system";
- private SystemDescriptor defaultSystemDescriptor = mock(SystemDescriptor.class);
- private List<InputDescriptor> mockInputs = new ArrayList<InputDescriptor>() { {
- InputDescriptor mock1 = mock(InputDescriptor.class);
- InputDescriptor mock2 = mock(InputDescriptor.class);
- when(mock1.getStreamId()).thenReturn("test-input1");
- when(mock2.getStreamId()).thenReturn("test-input2");
- this.add(mock1);
- this.add(mock2);
- } };
- private List<OutputDescriptor> mockOutputs = new ArrayList<OutputDescriptor>() { {
- OutputDescriptor mock1 = mock(OutputDescriptor.class);
- OutputDescriptor mock2 = mock(OutputDescriptor.class);
- when(mock1.getStreamId()).thenReturn("test-output1");
- when(mock2.getStreamId()).thenReturn("test-output2");
- this.add(mock1);
- this.add(mock2);
- } };
- private Set<TableDescriptor> mockTables = new HashSet<TableDescriptor>() { {
- BaseTableDescriptor mock1 = mock(BaseTableDescriptor.class);
- BaseTableDescriptor mock2 = mock(BaseTableDescriptor.class);
- when(mock1.getTableId()).thenReturn("test-table1");
- when(mock2.getTableId()).thenReturn("test-table2");
- when(mock1.getSerde()).thenReturn(mock(KVSerde.class));
- when(mock2.getSerde()).thenReturn(mock(KVSerde.class));
- this.add(mock1);
- this.add(mock2);
- } };
-
- @Before
- public void setUp() {
- when(defaultSystemDescriptor.getSystemName()).thenReturn(defaultSystemName);
- mockInputs.forEach(isd -> when(isd.getSystemDescriptor()).thenReturn(defaultSystemDescriptor));
- mockOutputs.forEach(osd -> when(osd.getSystemDescriptor()).thenReturn(defaultSystemDescriptor));
- }
-
- @Test
- public void testConstructor() {
- TaskApplication mockApp = mock(TaskApplication.class);
- TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(mockApp, config);
- verify(mockApp).describe(appDesc);
- assertEquals(config, appDesc.config);
- }
-
- @Test
- public void testAddInputStreams() {
- TaskApplication testApp = appDesc -> {
- mockInputs.forEach(appDesc::addInputStream);
- };
- TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
- assertEquals(mockInputs.toArray(), appDesc.getInputDescriptors().values().toArray());
- }
-
- @Test
- public void testAddOutputStreams() {
- TaskApplication testApp = appDesc -> {
- mockOutputs.forEach(appDesc::addOutputStream);
- };
- TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
- assertEquals(mockOutputs.toArray(), appDesc.getOutputDescriptors().values().toArray());
- }
-
- @Test
- public void testAddTables() {
- TaskApplication testApp = appDesc -> {
- mockTables.forEach(appDesc::addTable);
- };
- TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
- assertEquals(mockTables, appDesc.getTableDescriptors());
- }
-
- @Test
- public void testSetTaskFactory() {
- TaskFactory mockTf = mock(TaskFactory.class);
- TaskApplication testApp = appDesc -> appDesc.setTaskFactory(mockTf);
- TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
- assertEquals(appDesc.getTaskFactory(), mockTf);
- }
-
- @Test
- public void testApplicationContainerContextFactory() {
- ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
- TaskApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
- TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
- assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
- }
-
- @Test
- public void testNoApplicationContainerContextFactory() {
- TaskApplication testApp = appDesc -> {
- };
- TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
- assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
- }
-
- @Test
- public void testApplicationTaskContextFactory() {
- ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
- TaskApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
- TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
- assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
- }
-
- @Test
- public void testNoApplicationTaskContextFactory() {
- TaskApplication testApp = appDesc -> {
- };
- TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
- assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
- }
-
- @Test
- public void testProcessorLifecycleListener() {
- ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
- TaskApplication testApp = appDesc -> {
- appDesc.withProcessorLifecycleListenerFactory(mockFactory);
- };
- TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
- assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
- }
-}
\ No newline at end of file