You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/08 22:04:40 UTC
[4/4] samza git commit: SAMZA-1981: Consolidate table descriptors to
samza-api
SAMZA-1981: Consolidate table descriptors to samza-api
As per subject, table descriptors moved are
- LocalTableDescriptor
- RemoteTableDescriptor
- HybridTableDescriptor
- GuavaCacheTableDescriptor
- CachingTableDescriptor
Author: Wei Song <ws...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #799 from weisong44/SAMZA-1981
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3da75e61
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3da75e61
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3da75e61
Branch: refs/heads/master
Commit: 3da75e61ab61dad8aacd31dd38513e4edaa9d36a
Parents: 2159171
Author: Wei Song <ws...@linkedin.com>
Authored: Thu Nov 8 14:04:28 2018 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Thu Nov 8 14:04:28 2018 -0800
----------------------------------------------------------------------
.../org/apache/samza/table/TableProvider.java | 61 ++++
.../samza/table/TableProviderFactory.java | 35 +++
.../table/descriptors/BaseTableDescriptor.java | 110 +++++++
.../descriptors/CachingTableDescriptor.java | 173 +++++++++++
.../descriptors/GuavaCacheTableDescriptor.java | 79 +++++
.../descriptors/HybridTableDescriptor.java | 48 +++
.../table/descriptors/LocalTableDescriptor.java | 168 +++++++++++
.../descriptors/RemoteTableDescriptor.java | 296 +++++++++++++++++++
.../samza/table/descriptors/TableProvider.java | 62 ----
.../table/descriptors/TableProviderFactory.java | 36 ---
.../samza/table/remote/TableRateLimiter.java | 167 +++++++++++
.../samza/table/remote/TableReadFunction.java | 111 +++++++
.../samza/table/remote/TableWriteFunction.java | 159 ++++++++++
.../samza/table/retry/TableRetryPolicy.java | 257 ++++++++++++++++
.../apache/samza/table/utils/SerdeUtils.java | 66 +++++
.../table/remote/TestTableRateLimiter.java | 103 +++++++
.../descriptors/ApplicationDescriptorImpl.java | 6 +-
.../apache/samza/config/JavaTableConfig.java | 5 +-
.../apache/samza/table/BaseTableProvider.java | 71 +++++
.../samza/table/TableConfigGenerator.java | 2 -
.../org/apache/samza/table/TableManager.java | 2 -
.../table/caching/CachingTableProvider.java | 98 ++++++
.../caching/CachingTableProviderFactory.java | 34 +++
.../descriptors/CachingTableDescriptor.java | 166 -----------
.../descriptors/CachingTableProvider.java | 105 -------
.../CachingTableProviderFactory.java | 34 ---
.../caching/guava/GuavaCacheTableProvider.java | 59 ++++
.../guava/GuavaCacheTableProviderFactory.java | 34 +++
.../descriptors/GuavaCacheTableDescriptor.java | 75 -----
.../descriptors/GuavaCacheTableProvider.java | 60 ----
.../GuavaCacheTableProviderFactory.java | 34 ---
.../descriptors/BaseHybridTableDescriptor.java | 48 ---
.../table/descriptors/BaseTableDescriptor.java | 110 -------
.../samza/table/remote/RemoteTableProvider.java | 190 ++++++++++++
.../remote/RemoteTableProviderFactory.java | 38 +++
.../samza/table/remote/TableRateLimiter.java | 167 -----------
.../samza/table/remote/TableReadFunction.java | 111 -------
.../samza/table/remote/TableWriteFunction.java | 159 ----------
.../descriptors/RemoteTableDescriptor.java | 275 -----------------
.../remote/descriptors/RemoteTableProvider.java | 202 -------------
.../descriptors/RemoteTableProviderFactory.java | 38 ---
.../table/retry/RetriableReadFunction.java | 2 +-
.../table/retry/RetriableWriteFunction.java | 2 +-
.../samza/table/retry/TableRetryPolicy.java | 257 ----------------
.../apache/samza/table/utils/SerdeUtils.java | 66 -----
.../utils/descriptors/BaseTableProvider.java | 73 -----
.../TestJobNodeConfigurationGenerator.java | 4 +-
.../apache/samza/table/TestTableManager.java | 2 -
.../samza/table/caching/TestCachingTable.java | 18 +-
.../table/remote/TestTableRateLimiter.java | 103 -------
.../descriptors/TestRemoteTableDescriptor.java | 14 +-
.../descriptors/InMemoryTableDescriptor.java | 4 +-
.../descriptors/InMemoryTableProvider.java | 4 +-
.../InMemoryTableProviderFactory.java | 4 +-
.../descriptors/TestInMemoryTableProvider.java | 2 +-
.../kv/descriptors/RocksDbTableDescriptor.java | 3 +-
.../kv/descriptors/RocksDbTableProvider.java | 3 +-
.../RocksDbTableProviderFactory.java | 4 +-
.../descriptors/TestRocksDbTableDescriptor.java | 10 +-
.../descriptors/TestRocksDbTableProvider.java | 2 +-
.../samza/storage/kv/LocalTableProvider.java | 148 ++++++++++
.../descriptors/BaseLocalTableDescriptor.java | 168 -----------
.../kv/descriptors/BaseLocalTableProvider.java | 149 ----------
.../descriptors/TestBaseLocalTableProvider.java | 150 ----------
.../kv/descriptors/TestLocalTableProvider.java | 152 ++++++++++
.../sql/testutil/TestIOResolverFactory.java | 6 +-
.../samza/test/table/TestRemoteTable.java | 6 +-
67 files changed, 2708 insertions(+), 2702 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
new file mode 100644
index 0000000..350324c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+
+/**
+ * A table provider provides the implementation for a table. It ensures a table is
+ * properly constructed and also manages its lifecycle.
+ */
+@InterfaceStability.Unstable
+public interface TableProvider {
+ /**
+ * Initialize TableProvider with container and task context
+ * @param context context for the task
+ */
+ void init(Context context);
+
+ /**
+ * Get an instance of the table for read/write operations
+ * @return the underlying table
+ */
+ Table getTable();
+
+ /**
+ * Generate any configuration for this table, the generated configuration
+ * is used by Samza container to construct this table and any components
+ * necessary. Instead of manipulating the input parameters, this method
+ * should return the generated configuration.
+ *
+ * @param jobConfig the job config
+ * @param generatedConfig config generated by earlier processing, but has
+ * not yet been merged to job config
+ * @return configuration for this table
+ */
+ Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig);
+
+ /**
+ * Shutdown the underlying table
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
new file mode 100644
index 0000000..1bb0196
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * Factory of a table provider object
+ */
+@InterfaceStability.Unstable
+public interface TableProviderFactory {
+ /**
+ * Constructs an instances of the table provider based on a given table spec
+ * @param tableSpec the table spec
+ * @return the table provider
+ */
+ TableProvider getTableProvider(TableSpec tableSpec);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
new file mode 100644
index 0000000..246216b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Base class for all table descriptor implementations.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
+ implements TableDescriptor<K, V, D> {
+
+ protected final String tableId;
+
+ protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde());
+
+ protected final Map<String, String> config = new HashMap<>();
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ */
+ protected BaseTableDescriptor(String tableId) {
+ this.tableId = tableId;
+ }
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ * @param serde the serde for key and value
+ */
+ protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ this.tableId = tableId;
+ this.serde = serde;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public D withConfig(String key, String value) {
+ config.put(key, value);
+ return (D) this;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getTableId() {
+ return tableId;
+ }
+
+ /**
+ * Get the serde assigned to this {@link TableDescriptor}
+ *
+ * @return {@link KVSerde} used by this table
+ */
+ public KVSerde<K, V> getSerde() {
+ return serde;
+ }
+
+ /**
+ * Generate config for {@link TableSpec}; this method is used internally.
+ * @param tableSpecConfig configuration for the {@link TableSpec}
+ */
+ protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+ tableSpecConfig.putAll(config);
+ }
+
+ /**
+ * Validate that this table descriptor is constructed properly; this method is used internally.
+ */
+ protected void validate() {
+ }
+
+ /**
+ * Create a {@link TableSpec} from this table descriptor; this method is used internally.
+ *
+ * @return the {@link TableSpec}
+ */
+ abstract public TableSpec getTableSpec();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
new file mode 100644
index 0000000..d6248c6
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.descriptors;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.table.TableSpec;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Table descriptor for a caching table.
+ * @param <K> type of the key in the cache
+ * @param <V> type of the value in the cache
+ */
+public class CachingTableDescriptor<K, V> extends HybridTableDescriptor<K, V, CachingTableDescriptor<K, V>> {
+
+ public static final String PROVIDER_FACTORY_CLASS_NAME = "org.apache.samza.table.caching.CachingTableProviderFactory";
+
+ public static final String REAL_TABLE_ID = "realTableId";
+ public static final String CACHE_TABLE_ID = "cacheTableId";
+ public static final String READ_TTL_MS = "readTtl";
+ public static final String WRITE_TTL_MS = "writeTtl";
+ public static final String CACHE_SIZE = "cacheSize";
+ public static final String WRITE_AROUND = "writeAround";
+
+ private Duration readTtl;
+ private Duration writeTtl;
+ private long cacheSize;
+ private TableDescriptor<K, V, ?> cache;
+ private TableDescriptor<K, V, ?> table;
+ private boolean isWriteAround;
+
+ /**
+ * Constructs a table descriptor instance with internal cache
+ *
+ * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
+ * @param table target table descriptor
+ */
+ public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table) {
+ super(tableId);
+ this.table = table;
+ }
+
+ /**
+ * Constructs a table descriptor instance and specify a cache (as Table descriptor)
+ * to be used for caching. Cache get is not synchronized with put for better parallelism
+ * in the read path of caching table. As such, cache table implementation is
+ * expected to be thread-safe for concurrent accesses.
+ *
+ * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
+ * @param table target table descriptor
+ * @param cache cache table descriptor
+ */
+ public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table,
+ TableDescriptor<K, V, ?> cache) {
+ this(tableId, table);
+ this.cache = cache;
+ }
+
+ @Override
+ public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors() {
+ return cache != null
+ ? Arrays.asList(cache, table)
+ : Arrays.asList(table);
+ }
+
+ @Override
+ public TableSpec getTableSpec() {
+ validate();
+
+ Map<String, String> tableSpecConfig = new HashMap<>();
+ generateTableSpecConfig(tableSpecConfig);
+
+ if (cache != null) {
+ tableSpecConfig.put(CACHE_TABLE_ID, ((BaseTableDescriptor) cache).getTableSpec().getId());
+ } else {
+ if (readTtl != null) {
+ tableSpecConfig.put(READ_TTL_MS, String.valueOf(readTtl.toMillis()));
+ }
+ if (writeTtl != null) {
+ tableSpecConfig.put(WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()));
+ }
+ if (cacheSize > 0) {
+ tableSpecConfig.put(CACHE_SIZE, String.valueOf(cacheSize));
+ }
+ }
+
+ tableSpecConfig.put(REAL_TABLE_ID, ((BaseTableDescriptor) table).getTableSpec().getId());
+ tableSpecConfig.put(WRITE_AROUND, String.valueOf(isWriteAround));
+
+ return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
+ }
+
+ /**
+ * Specify the TTL for each read access, ie. record is expired after
+ * the TTL duration since last read access of each key.
+ * @param readTtl read TTL
+ * @return this descriptor
+ */
+ public CachingTableDescriptor<K, V> withReadTtl(Duration readTtl) {
+ this.readTtl = readTtl;
+ return this;
+ }
+
+ /**
+ * Specify the TTL for each write access, ie. record is expired after
+ * the TTL duration since last write access of each key.
+ * @param writeTtl write TTL
+ * @return this descriptor
+ */
+ public CachingTableDescriptor<K, V> withWriteTtl(Duration writeTtl) {
+ this.writeTtl = writeTtl;
+ return this;
+ }
+
+ /**
+ * Specify the max cache size for size-based eviction.
+ * @param cacheSize max size of the cache
+ * @return this descriptor
+ */
+ public CachingTableDescriptor<K, V> withCacheSize(long cacheSize) {
+ this.cacheSize = cacheSize;
+ return this;
+ }
+
+ /**
+ * Specify if write-around policy should be used to bypass writing
+ * to cache for put operations. This is useful when put() is the
+ * dominant operation and get() has no locality with recent puts.
+ * @return this descriptor
+ */
+ public CachingTableDescriptor<K, V> withWriteAround() {
+ this.isWriteAround = true;
+ return this;
+ }
+
+ @Override
+ @VisibleForTesting
+ public void validate() {
+ super.validate();
+ Preconditions.checkNotNull(table, "Actual table is required.");
+ if (cache == null) {
+ Preconditions.checkNotNull(readTtl, "readTtl must be specified.");
+ } else {
+ Preconditions.checkArgument(readTtl == null && writeTtl == null && cacheSize == 0,
+ "Invalid to specify both {cache} and {readTtl|writeTtl|cacheSize} at the same time.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
new file mode 100644
index 0000000..192bd7e
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.utils.SerdeUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+
+
+/**
+ * Table descriptor for Guava-based caching table.
+ * @param <K> type of the key in the cache
+ * @param <V> type of the value in the cache
+ */
+public class GuavaCacheTableDescriptor<K, V> extends BaseTableDescriptor<K, V, GuavaCacheTableDescriptor<K, V>> {
+
+ public static final String PROVIDER_FACTORY_CLASS_NAME = "org.apache.samza.table.caching.guava.GuavaCacheTableProviderFactory";
+
+ public static final String GUAVA_CACHE = "guavaCache";
+
+ private Cache<K, V> cache;
+
+ /**
+ * {@inheritDoc}
+ */
+ public GuavaCacheTableDescriptor(String tableId) {
+ super(tableId);
+ }
+
+ @Override
+ public TableSpec getTableSpec() {
+ validate();
+
+ Map<String, String> tableSpecConfig = new HashMap<>();
+ generateTableSpecConfig(tableSpecConfig);
+
+ tableSpecConfig.put(GUAVA_CACHE, SerdeUtils.serialize("Guava cache", cache));
+
+ return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
+ }
+
+ /**
+ * Specify a pre-configured Guava cache instance to be used for caching table.
+ * @param cache Guava cache instance
+ * @return this descriptor
+ */
+ public GuavaCacheTableDescriptor withCache(Cache<K, V> cache) {
+ this.cache = cache;
+ return this;
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+ Preconditions.checkArgument(cache != null, "Must provide a Guava cache instance.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
new file mode 100644
index 0000000..ec8cc67
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import java.util.List;
+
+/**
+ * Base class for hybrid table descriptors. A hybrid table consists of one or more
+ * table descriptors, and it orchestrates operations between them to achieve more advanced
+ * functionality.
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ * @param <D> the type of this table descriptor
+ */
+abstract public class HybridTableDescriptor<K, V, D extends HybridTableDescriptor<K, V, D>>
+ extends BaseTableDescriptor<K, V, D> {
+
+ /**
+ * {@inheritDoc}
+ */
+ public HybridTableDescriptor(String tableId) {
+ super(tableId);
+ }
+
+ /**
+ * Get tables contained within this table.
+ * @return list of tables
+ */
+ abstract public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors();
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
new file mode 100644
index 0000000..dfcaea4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.storage.SideInputsProcessor;
+
+
+/**
+ * Table descriptor for store backed tables.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<K, V, D>>
+ extends BaseTableDescriptor<K, V, D> {
+
+ static final public String INTERNAL_ENABLE_CHANGELOG = "internal.enable.changelog";
+ static final public String INTERNAL_CHANGELOG_STREAM = "internal.changelog.stream";
+ static final public String INTERNAL_CHANGELOG_REPLICATION_FACTOR = "internal.changelog.replication.factor";
+
+ protected List<String> sideInputs;
+ protected SideInputsProcessor sideInputsProcessor;
+ protected boolean enableChangelog;
+ protected String changelogStream;
+ protected Integer changelogReplicationFactor;
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ */
+ public LocalTableDescriptor(String tableId) {
+ super(tableId);
+ }
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ * @param serde the serde for key and value
+ */
+ public LocalTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ super(tableId, serde);
+ }
+
+ public D withSideInputs(List<String> sideInputs) {
+ this.sideInputs = sideInputs;
+ // Disable changelog
+ this.enableChangelog = false;
+ this.changelogStream = null;
+ this.changelogReplicationFactor = null;
+ return (D) this;
+ }
+
+ public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
+ this.sideInputsProcessor = sideInputsProcessor;
+ return (D) this;
+ }
+
+ /**
+ * Enable changelog for this table, by default changelog is disabled. When the
+ * changelog stream name is not specified, it is automatically generated in
+ * the format {@literal [job-name]-[job-id]-table-[table-id]}.
+ * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
+ *
+ * @return this table descriptor instance
+ */
+ public D withChangelogEnabled() {
+ this.enableChangelog = true;
+ return (D) this;
+ }
+
+ /**
+ * Samza stores are local to a container. If the container fails, the contents of
+ * the store are lost. To prevent loss of data, you need to set this property to
+ * configure a changelog stream: Samza then ensures that writes to the store are
+ * replicated to this stream, and the store is restored from this stream after a
+ * failure. The value of this property is given in the form system-name.stream-name.
+ * The "system-name" part is optional. If it is omitted you must specify the system
+ * in <code>job.changelog.system</code> config. Any output stream can be used as
+ * changelog, but you must ensure that only one job ever writes to a given changelog
+ * stream (each instance of a job and each store needs its own changelog stream).
+ * <p>
+ * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
+ *
+ * @param changelogStream changelog stream name
+ * @return this table descriptor instance
+ */
+ public D withChangelogStream(String changelogStream) {
+ this.enableChangelog = true;
+ this.changelogStream = changelogStream;
+ return (D) this;
+ }
+
+ /**
+ * The property defines the number of replicas to use for the change log stream.
+ * <p>
+ * Default value is <code>stores.default.changelog.replication.factor</code>.
+ * <p>
+ * Refer to <code>stores.store-name.changelog.replication.factor</code> in Samza configuration guide
+ *
+ * @param replicationFactor replication factor
+ * @return this table descriptor instance
+ */
+ public D withChangelogReplicationFactor(int replicationFactor) {
+ this.enableChangelog = true;
+ this.changelogReplicationFactor = replicationFactor;
+ return (D) this;
+ }
+
+ @Override
+ protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+ super.generateTableSpecConfig(tableSpecConfig);
+
+ tableSpecConfig.put(INTERNAL_ENABLE_CHANGELOG, String.valueOf(enableChangelog));
+ if (enableChangelog) {
+ if (changelogStream != null) {
+ tableSpecConfig.put(INTERNAL_CHANGELOG_STREAM, changelogStream);
+ }
+ if (changelogReplicationFactor != null) {
+ tableSpecConfig.put(INTERNAL_CHANGELOG_REPLICATION_FACTOR, String.valueOf(changelogReplicationFactor));
+ }
+ }
+ }
+
+ /**
+ * Validate that this table descriptor is constructed properly
+ */
+ @Override
+ protected void validate() {
+ super.validate();
+ if (sideInputs != null || sideInputsProcessor != null) {
+ Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null,
+ String.format("Invalid side input configuration for table: %s. " +
+ "Both side inputs and the processor must be provided", tableId));
+ }
+ if (!enableChangelog) {
+ Preconditions.checkState(changelogStream == null,
+ String.format("Invalid changelog configuration for table: %s. Changelog " +
+ "must be enabled, when changelog stream name is provided", tableId));
+ Preconditions.checkState(changelogReplicationFactor == null,
+ String.format("Invalid changelog configuration for table: %s. Changelog " +
+ "must be enabled, when changelog replication factor is provided", tableId));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
new file mode 100644
index 0000000..8b34ec9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.descriptors;
+
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.util.RateLimiter;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Table descriptor for remote store backed tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, RemoteTableDescriptor<K, V>> {
+
+ public static final String PROVIDER_FACTORY_CLASS_NAME = "org.apache.samza.table.remote.RemoteTableProviderFactory";
+
+ public static final String DEFAULT_RATE_LIMITER_CLASS_NAME = "org.apache.samza.util.EmbeddedTaggedRateLimiter";
+
+ /**
+ * Tag to be used for provision credits for rate limiting read operations from the remote table.
+ * Caller must pre-populate the credits with this tag when specifying a custom rate limiter instance
+ * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
+ * TableRateLimiter.CreditFunction)}
+ */
+ public static final String RL_READ_TAG = "readTag";
+
+ /**
+ * Tag to be used for provision credits for rate limiting write operations into the remote table.
+ * Caller can optionally populate the credits with this tag when specifying a custom rate limiter instance
+ * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
+ * TableRateLimiter.CreditFunction)} and it needs the write functionality.
+ */
+ public static final String RL_WRITE_TAG = "writeTag";
+
+ public static final String READ_FN = "io.read.func";
+ public static final String WRITE_FN = "io.write.func";
+ public static final String RATE_LIMITER = "io.ratelimiter";
+ public static final String READ_CREDIT_FN = "io.read.credit.func";
+ public static final String WRITE_CREDIT_FN = "io.write.credit.func";
+ public static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
+ public static final String READ_RETRY_POLICY = "io.read.retry.policy";
+ public static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
+
+ // Input support for a specific remote store (required)
+ private TableReadFunction<K, V> readFn;
+
+ // Output support for a specific remote store (optional)
+ private TableWriteFunction<K, V> writeFn;
+
+ // Rate limiter for client-side throttling; it is set by withRateLimiter()
+ private RateLimiter rateLimiter;
+
+ // Rates for constructing the default rate limiter when they are non-zero
+ private Map<String, Integer> tagCreditsMap = new HashMap<>();
+
+ private TableRateLimiter.CreditFunction<K, V> readCreditFn;
+ private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
+
+ private TableRetryPolicy readRetryPolicy;
+ private TableRetryPolicy writeRetryPolicy;
+
+ // By default execute future callbacks on the native client threads
+ // ie. no additional thread pool for callbacks.
+ private int asyncCallbackPoolSize = -1;
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ */
+ public RemoteTableDescriptor(String tableId) {
+ super(tableId);
+ }
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ * @param serde the serde for key and value
+ */
+ public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ super(tableId, serde);
+ }
+
+ @Override
+ public TableSpec getTableSpec() {
+ validate();
+
+ Map<String, String> tableSpecConfig = new HashMap<>();
+ generateTableSpecConfig(tableSpecConfig);
+
+ // Serialize and store reader/writer functions
+ tableSpecConfig.put(READ_FN, SerdeUtils.serialize("read function", readFn));
+
+ if (writeFn != null) {
+ tableSpecConfig.put(WRITE_FN, SerdeUtils.serialize("write function", writeFn));
+ }
+
+ if (!tagCreditsMap.isEmpty()) {
+ RateLimiter defaultRateLimiter;
+ try {
+ Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
+ Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
+ defaultRateLimiter = ctor.newInstance(tagCreditsMap);
+ } catch (Exception ex) {
+ throw new SamzaException("Failed to create default rate limiter", ex);
+ }
+ tableSpecConfig.put(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter));
+ } else if (rateLimiter != null) {
+ tableSpecConfig.put(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
+ }
+
+ // Serialize the readCredit functions
+ if (readCreditFn != null) {
+ tableSpecConfig.put(READ_CREDIT_FN, SerdeUtils.serialize(
+ "read credit function", readCreditFn));
+ }
+ // Serialize the writeCredit functions
+ if (writeCreditFn != null) {
+ tableSpecConfig.put(WRITE_CREDIT_FN, SerdeUtils.serialize(
+ "write credit function", writeCreditFn));
+ }
+
+ if (readRetryPolicy != null) {
+ tableSpecConfig.put(READ_RETRY_POLICY, SerdeUtils.serialize(
+ "read retry policy", readRetryPolicy));
+ }
+
+ if (writeRetryPolicy != null) {
+ tableSpecConfig.put(WRITE_RETRY_POLICY, SerdeUtils.serialize(
+ "write retry policy", writeRetryPolicy));
+ }
+
+ tableSpecConfig.put(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
+
+ return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
+ }
+
+ /**
+ * Use specified TableReadFunction with remote table and a retry policy.
+ * @param readFn read function instance
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn) {
+ Preconditions.checkNotNull(readFn, "null read function");
+ this.readFn = readFn;
+ return this;
+ }
+
+ /**
+ * Use specified TableWriteFunction with remote table and a retry policy.
+ * @param writeFn write function instance
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn) {
+ Preconditions.checkNotNull(writeFn, "null write function");
+ this.writeFn = writeFn;
+ return this;
+ }
+
+ /**
+ * Use specified TableReadFunction with remote table.
+ * @param readFn read function instance
+ * @param retryPolicy retry policy for the read function
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
+ Preconditions.checkNotNull(readFn, "null read function");
+ Preconditions.checkNotNull(retryPolicy, "null retry policy");
+ this.readFn = readFn;
+ this.readRetryPolicy = retryPolicy;
+ return this;
+ }
+
+ /**
+ * Use specified TableWriteFunction with remote table.
+ * @param writeFn write function instance
+ * @param retryPolicy retry policy for the write function
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
+ Preconditions.checkNotNull(writeFn, "null write function");
+ Preconditions.checkNotNull(retryPolicy, "null retry policy");
+ this.writeFn = writeFn;
+ this.writeRetryPolicy = retryPolicy;
+ return this;
+ }
+
+ /**
+ * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
+ * of credits to be charged from the rate limiter for table read and write operations.
+ * This is an advanced API that provides greater flexibility to throttle each record in the table
+ * with different number of credits. For most common use-cases eg: limit the number of read/write
+ * operations, please instead use the {@link RemoteTableDescriptor#withReadRateLimit(int)} and
+ * {@link RemoteTableDescriptor#withWriteRateLimit(int)}.
+ *
+ * @param rateLimiter rate limiter instance to be used for throttling
+ * @param readCreditFn credit function for rate limiting read operations
+ * @param writeCreditFn credit function for rate limiting write operations
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withRateLimiter(RateLimiter rateLimiter,
+ TableRateLimiter.CreditFunction<K, V> readCreditFn,
+ TableRateLimiter.CreditFunction<K, V> writeCreditFn) {
+ Preconditions.checkNotNull(rateLimiter, "null read rate limiter");
+ this.rateLimiter = rateLimiter;
+ this.readCreditFn = readCreditFn;
+ this.writeCreditFn = writeCreditFn;
+ return this;
+ }
+
+ /**
+ * Specify the rate limit for table read operations. If the read rate limit is set with this method
+ * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
+ * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
+ * and vice versa.
+ * @param creditsPerSec rate limit for read operations; must be positive
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withReadRateLimit(int creditsPerSec) {
+ Preconditions.checkArgument(creditsPerSec > 0, "Max read rate must be a positive number.");
+ tagCreditsMap.put(RL_READ_TAG, creditsPerSec);
+ return this;
+ }
+
+ /**
+ * Specify the rate limit for table write operations. If the write rate limit is set with this method
+ * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
+ * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
+ * and vice versa.
+ * @param creditsPerSec rate limit for write operations; must be positive
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withWriteRateLimit(int creditsPerSec) {
+ Preconditions.checkArgument(creditsPerSec > 0, "Max write rate must be a positive number.");
+ tagCreditsMap.put(RL_WRITE_TAG, creditsPerSec);
+ return this;
+ }
+
+ /**
+ * Specify the size of the thread pool for the executor used to execute
+ * callbacks of CompletableFutures of async Table operations. By default, these
+ * futures are completed (called) by the threads of the native store client. Depending
+ * on the implementation of the native client, it may or may not allow executing long
+ * running operations in the callbacks. This config can be used to execute the callbacks
+ * from a separate executor to decouple from the native client. If configured, this
+ * thread pool is shared by all read and write operations.
+ * @param poolSize max number of threads in the executor for async callbacks
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withAsyncCallbackExecutorPoolSize(int poolSize) {
+ this.asyncCallbackPoolSize = poolSize;
+ return this;
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+ Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
+ Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
+ "Only one of rateLimiter instance or read/write limits can be specified");
+ // Assume callback executor pool should have no more than 20 threads
+ Preconditions.checkArgument(asyncCallbackPoolSize <= 20,
+ "too many threads for async callback executor.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
deleted file mode 100644
index 8640b2a..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.descriptors;
-
-import java.util.Map;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.Table;
-
-/**
- * A table provider provides the implementation for a table. It ensures a table is
- * properly constructed and also manages its lifecycle.
- */
-@InterfaceStability.Unstable
-public interface TableProvider {
- /**
- * Initialize TableProvider with container and task context
- * @param context context for the task
- */
- void init(Context context);
-
- /**
- * Get an instance of the table for read/write operations
- * @return the underlying table
- */
- Table getTable();
-
- /**
- * Generate any configuration for this table, the generated configuration
- * is used by Samza container to construct this table and any components
- * necessary. Instead of manipulating the input parameters, this method
- * should return the generated configuration.
- *
- * @param jobConfig the job config
- * @param generatedConfig config generated by earlier processing, but has
- * not yet been merged to job config
- * @return configuration for this table
- */
- Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig);
-
- /**
- * Shutdown the underlying table
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
deleted file mode 100644
index 2f9a607..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.descriptors;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.table.TableSpec;
-
-
-/**
- * Factory of a table provider object
- */
-@InterfaceStability.Unstable
-public interface TableProviderFactory {
- /**
- * Constructs an instances of the table provider based on a given table spec
- * @param tableSpec the table spec
- * @return the table provider
- */
- TableProvider getTableProvider(TableSpec tableSpec);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
new file mode 100644
index 0000000..c67a648
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.util.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Helper class for remote table to throttle table IO requests with the configured rate limiter.
+ * For each request, the needed credits are calculated with the configured credit functions.
+ * The throttle methods are overloaded to support the possible CRUD operations.
+ *
+ * @param <K> type of the table key
+ * @param <V> type of the table record
+ */
+public class TableRateLimiter<K, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(TableRateLimiter.class);
+
+ private final String tag;
+ private final boolean rateLimited;
+ private final CreditFunction<K, V> creditFn;
+
+ @VisibleForTesting
+ final RateLimiter rateLimiter;
+
+ private Timer waitTimeMetric;
+
+ /**
+ * Function interface for providing rate limiting credits for each table record.
+ * This interface allows callers to pass in lambda expressions which are otherwise
+ * non-serializable as-is.
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+ @InterfaceStability.Unstable
+ public interface CreditFunction<K, V> extends Serializable {
+ /**
+ * Get the number of credits required for the {@code key} and {@code value} pair.
+ * @param key table key
+ * @param value table record
+ * @return number of credits
+ */
+ int getCredits(K key, V value);
+ }
+
+ /**
+ * @param tableId table id of the table to be rate limited
+ * @param rateLimiter actual rate limiter instance to be used
+ * @param creditFn function for deriving the credits for each request
+ * @param tag tag to be used with the rate limiter
+ */
+ public TableRateLimiter(String tableId, RateLimiter rateLimiter, CreditFunction<K, V> creditFn, String tag) {
+ this.rateLimiter = rateLimiter;
+ this.creditFn = creditFn;
+ this.tag = tag;
+ this.rateLimited = rateLimiter != null && rateLimiter.getSupportedTags().contains(tag);
+ LOG.info("Rate limiting is {} for {}", rateLimited ? "enabled" : "disabled", tableId);
+ }
+
+ /**
+ * Set up waitTimeMetric metric for latency reporting due to throttling.
+ * @param timer waitTimeMetric metric
+ */
+ public void setTimerMetric(Timer timer) {
+ Preconditions.checkNotNull(timer);
+ this.waitTimeMetric = timer;
+ }
+
+ int getCredits(K key, V value) {
+ return (creditFn == null) ? 1 : creditFn.getCredits(key, value);
+ }
+
+ int getCredits(Collection<K> keys) {
+ if (creditFn == null) {
+ return keys.size();
+ } else {
+ return keys.stream().mapToInt(k -> creditFn.getCredits(k, null)).sum();
+ }
+ }
+
+ int getEntryCredits(Collection<Entry<K, V>> entries) {
+ if (creditFn == null) {
+ return entries.size();
+ } else {
+ return entries.stream().mapToInt(e -> creditFn.getCredits(e.getKey(), e.getValue())).sum();
+ }
+ }
+
+ private void throttle(int credits) {
+ if (!rateLimited) {
+ return;
+ }
+
+ long startNs = System.nanoTime();
+ rateLimiter.acquire(Collections.singletonMap(tag, credits));
+ waitTimeMetric.update(System.nanoTime() - startNs);
+ }
+
+ /**
+ * Throttle a request with a key argument if necessary.
+ * @param key key used for the table request
+ */
+ public void throttle(K key) {
+ throttle(getCredits(key, null));
+ }
+
+ /**
+ * Throttle a request with both the key and value arguments if necessary.
+ * @param key key used for the table request
+ * @param value value used for the table request
+ */
+ public void throttle(K key, V value) {
+ throttle(getCredits(key, value));
+ }
+
+ /**
+ * Throttle a request with a collection of keys as the argument if necessary.
+ * @param keys collection of keys used for the table request
+ */
+ public void throttle(Collection<K> keys) {
+ throttle(getCredits(keys));
+ }
+
+ /**
+ * Throttle a request with a collection of table records as the argument if necessary.
+ * @param records collection of records used for the table request
+ */
+ public void throttleRecords(Collection<Entry<K, V>> records) {
+ throttle(getEntryCredits(records));
+ }
+
+ /**
+ * @return whether rate limiting is enabled for the associated table
+ */
+ public boolean isRateLimited() {
+ return rateLimited;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
new file mode 100644
index 0000000..d54f83d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.InitableFunction;
+
+import com.google.common.collect.Iterables;
+
+
+/**
+ * A function object to be used with a remote readable table implementation. It encapsulates the functionality
+ * of reading table record(s) for a provided set of key(s).
+ *
+ * <p> Instances of {@link TableReadFunction} are meant to be serializable. ie. any non-serializable state
+ * (eg: network sockets) should be marked as transient and recreated inside readObject().
+ *
+ * <p> Implementations are expected to be thread-safe.
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+@InterfaceStability.Unstable
+public interface TableReadFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+ /**
+ * Fetch single table record for a specified {@code key}. This method must be thread-safe.
+ * The default implementation calls getAsync and blocks on the completion afterwards.
+ * @param key key for the table record
+ * @return table record for the specified {@code key}
+ */
+ default V get(K key) {
+ try {
+ return getAsync(key).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new SamzaException("GET failed for " + key, e);
+ }
+ }
+
+ /**
+ * Asynchronously fetch single table record for a specified {@code key}. This method must be thread-safe.
+ * @param key key for the table record
+ * @return CompletableFuture for the get request
+ */
+ CompletableFuture<V> getAsync(K key);
+
+ /**
+ * Fetch the table {@code records} for specified {@code keys}. This method must be thread-safe.
+ * The default implementation calls getAllAsync and blocks on the completion afterwards.
+ * @param keys keys for the table records
+ * @return all records for the specified keys.
+ */
+ default Map<K, V> getAll(Collection<K> keys) {
+ try {
+ return getAllAsync(keys).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new SamzaException("GET_ALL failed for " + keys, e);
+ }
+ }
+
+ /**
+ * Asynchronously fetch the table {@code records} for specified {@code keys}. This method must be thread-safe.
+ * The default implementation calls getAsync for each key and return a combined future.
+ * @param keys keys for the table records
+ * @return CompletableFuture for the get request
+ */
+ default CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
+ Map<K, CompletableFuture<V>> getFutures = keys.stream().collect(
+ Collectors.toMap(k -> k, k -> getAsync(k)));
+
+ return CompletableFuture.allOf(
+ Iterables.toArray(getFutures.values(), CompletableFuture.class))
+ .thenApply(future ->
+ getFutures.entrySet()
+ .stream()
+ .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().join())));
+ }
+
+ /**
+ * Determine whether the current operation can be retried with the last thrown exception.
+ * @param exception exception thrown by a table operation
+ * @return whether the operation can be retried
+ */
+ boolean isRetriable(Throwable exception);
+
+ // optionally implement readObject() to initialize transient states
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
new file mode 100644
index 0000000..1e3dc4c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.InitableFunction;
+import org.apache.samza.storage.kv.Entry;
+
+import com.google.common.collect.Iterables;
+
+
+/**
+ * A function object to be used with a remote read/write table implementation. It encapsulates the functionality
+ * of writing table record(s) for a provided set of key(s) to the store.
+ *
+ * <p> Instances of {@link TableWriteFunction} are meant to be serializable. ie. any non-serializable state
+ * (eg: network sockets) should be marked as transient and recreated inside readObject().
+ *
+ * <p> Implementations are expected to be thread-safe.
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+@InterfaceStability.Unstable
+public interface TableWriteFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+ /**
+ * Store single table {@code record} with specified {@code key}. This method must be thread-safe.
+ * The default implementation calls putAsync and blocks on the completion afterwards.
+ *
+ * @param key key for the table record
+ * @param record table record to be written
+ */
+ default void put(K key, V record) {
+ try {
+ putAsync(key, record).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new SamzaException("PUT failed for " + key, e);
+ }
+ }
+
+ /**
+ * Asynchronously store single table {@code record} with specified {@code key}. This method must be thread-safe.
+ * @param key key for the table record
+ * @param record table record to be written
+ * @return CompletableFuture for the put request
+ */
+ CompletableFuture<Void> putAsync(K key, V record);
+
+ /**
+ * Store the table {@code records} with specified {@code keys}. This method must be thread-safe.
+ * The default implementation calls putAllAsync and blocks on the completion afterwards.
+ * @param records table records to be written
+ */
+ default void putAll(List<Entry<K, V>> records) {
+ try {
+ putAllAsync(records).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new SamzaException("PUT_ALL failed for " + records, e);
+ }
+ }
+
+ /**
+ * Asynchronously store the table {@code records} with specified {@code keys}. This method must be thread-safe.
+ * The default implementation calls putAsync for each entry and return a combined future.
+ * @param records table records to be written
+ * @return CompletableFuture for the put request
+ */
+ default CompletableFuture<Void> putAllAsync(Collection<Entry<K, V>> records) {
+ List<CompletableFuture<Void>> putFutures =
+ records.stream().map(e -> putAsync(e.getKey(), e.getValue())).collect(Collectors.toList());
+ return CompletableFuture.allOf(Iterables.toArray(putFutures, CompletableFuture.class));
+ }
+
+ /**
+ * Delete the {@code record} with specified {@code key} from the remote store.
+ * The default implementation calls deleteAsync and blocks on the completion afterwards.
+ * @param key key to the table record to be deleted
+ */
+ default void delete(K key) {
+ try {
+ deleteAsync(key).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new SamzaException("DELETE failed for " + key, e);
+ }
+ }
+
+ /**
+ * Asynchronously delete the {@code record} with specified {@code key} from the remote store
+ * @param key key to the table record to be deleted
+ * @return CompletableFuture for the delete request
+ */
+ CompletableFuture<Void> deleteAsync(K key);
+
+ /**
+ * Delete all {@code records} with the specified {@code keys} from the remote store
+ * The default implementation calls deleteAllAsync and blocks on the completion afterwards.
+ * @param keys keys for the table records to be written
+ */
+ default void deleteAll(Collection<K> keys) {
+ try {
+ deleteAllAsync(keys).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new SamzaException("DELETE failed for " + keys, e);
+ }
+ }
+
+ /**
+ * Asynchronously delete all {@code records} with the specified {@code keys} from the remote store.
+ * The default implementation calls deleteAsync for each key and return a combined future.
+ *
+ * @param keys keys for the table records to be written
+ * @return CompletableFuture for the deleteAll request
+ */
+ default CompletableFuture<Void> deleteAllAsync(Collection<K> keys) {
+ List<CompletableFuture<Void>> deleteFutures =
+ keys.stream().map(this::deleteAsync).collect(Collectors.toList());
+ return CompletableFuture.allOf(Iterables.toArray(deleteFutures, CompletableFuture.class));
+ }
+
+ /**
+ * Determine whether the current operation can be retried with the last thrown exception.
+ * @param exception exception thrown by a table operation
+ * @return whether the operation can be retried
+ */
+ boolean isRetriable(Throwable exception);
+
+ /**
+ * Flush the remote store (optional)
+ */
+ default void flush() {
+ }
+
+ // optionally implement readObject() to initialize transient states
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
new file mode 100644
index 0000000..162eb07
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Common retry policy parameters for table IO. This serves as an abstraction on top of
+ * retry libraries. This common policy supports below features:
+ * - backoff modes: fixed, random, exponential
+ * - termination modes: by attempts, by duration
+ * - jitter
+ *
+ * Retry libraries can implement a subset or all features as described by this common policy.
+ */
+public class TableRetryPolicy implements Serializable {
+ enum BackoffType {
+ /**
+ * No backoff in between two retry attempts.
+ */
+ NONE,
+
+ /**
+ * Backoff by a fixed duration {@code sleepTime}.
+ */
+ FIXED,
+
+ /**
+ * Backoff by a randomly selected duration between {@code minSleep} and {@code maxSleep}.
+ */
+ RANDOM,
+
+ /**
+ * Backoff by exponentially increasing durations by {@code exponentialFactor} starting from {@code sleepTime}.
+ */
+ EXPONENTIAL
+ }
+
+ // Backoff parameters
+ private Duration sleepTime;
+ private Duration randomMin;
+ private Duration randomMax;
+ private double exponentialFactor;
+ private Duration exponentialMaxSleep;
+ private Duration jitter;
+
+ // By default no early termination
+ private Integer maxAttempts = null;
+ private Duration maxDuration = null;
+
+ // By default no backoff during retries
+ private BackoffType backoffType = BackoffType.NONE;
+
+ /**
+ * Serializable adapter interface for {@link java.util.function.Predicate}.
+ * This is needed because TableRetryPolicy needs to be serializable as part of the
+ * table config whereas {@link java.util.function.Predicate} is not serializable.
+ */
+ public interface RetryPredicate extends Predicate<Throwable>, Serializable {
+ }
+
+ // By default no custom retry predicate so retry decision is made solely by the table functions
+ private RetryPredicate retryPredicate = (ex) -> false;
+
+ /**
+ * Set the sleepTime time for the fixed backoff policy.
+ * @param sleepTime sleepTime time
+ * @return this policy instance
+ */
+ public TableRetryPolicy withFixedBackoff(Duration sleepTime) {
+ Preconditions.checkNotNull(sleepTime);
+ this.sleepTime = sleepTime;
+ this.backoffType = BackoffType.FIXED;
+ return this;
+ }
+
+ /**
+ * Set the sleepTime time for the random backoff policy. The actual sleepTime time
+ * before each attempt is randomly selected between {@code [minSleep, maxSleep]}
+ * @param minSleep lower bound sleepTime time
+ * @param maxSleep upper bound sleepTime time
+ * @return this policy instance
+ */
+ public TableRetryPolicy withRandomBackoff(Duration minSleep, Duration maxSleep) {
+ Preconditions.checkNotNull(minSleep);
+ Preconditions.checkNotNull(maxSleep);
+ this.randomMin = minSleep;
+ this.randomMax = maxSleep;
+ this.backoffType = BackoffType.RANDOM;
+ return this;
+ }
+
+ /**
+ * Set the parameters for the exponential backoff policy. The actual sleepTime time
+ * is exponentially incremented up to the {@code maxSleep} and multiplying
+ * successive delays by the {@code factor}.
+ * @param sleepTime initial sleepTime time
+ * @param maxSleep upper bound sleepTime time
+ * @param factor exponential factor for backoff
+ * @return this policy instance
+ */
+ public TableRetryPolicy withExponentialBackoff(Duration sleepTime, Duration maxSleep, double factor) {
+ Preconditions.checkNotNull(sleepTime);
+ Preconditions.checkNotNull(maxSleep);
+ this.sleepTime = sleepTime;
+ this.exponentialMaxSleep = maxSleep;
+ this.exponentialFactor = factor;
+ this.backoffType = BackoffType.EXPONENTIAL;
+ return this;
+ }
+
+ /**
+ * Set the jitter for the backoff policy to provide additional randomness.
+ * If this is set, a random value between {@code [0, jitter]} will be added
+ * to each sleepTime time. This applies to {@code FIXED} and {@code EXPONENTIAL}
+ * modes only.
+ * @param jitter initial sleepTime time
+ * @return this policy instance
+ */
+ public TableRetryPolicy withJitter(Duration jitter) {
+ Preconditions.checkNotNull(jitter);
+ if (backoffType != BackoffType.RANDOM) {
+ this.jitter = jitter;
+ }
+ return this;
+ }
+
+ /**
+ * Set maximum number of attempts before terminating the operation.
+ * @param maxAttempts number of attempts
+ * @return this policy instance
+ */
+ public TableRetryPolicy withStopAfterAttempts(int maxAttempts) {
+ Preconditions.checkArgument(maxAttempts >= 0);
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ /**
+ * Set maximum total delay (sleepTime + execution) before terminating the operation.
+ * @param maxDelay delay time
+ * @return this policy instance
+ */
+ public TableRetryPolicy withStopAfterDelay(Duration maxDelay) {
+ Preconditions.checkNotNull(maxDelay);
+ this.maxDuration = maxDelay;
+ return this;
+ }
+
+ /**
+ * Set the predicate to use for identifying retriable exceptions. If specified, table
+ * retry logic will consult both such predicate and table function and retry will be
+ * attempted if either option returns true.
+ * @param retryPredicate predicate for retriable exception identification
+ * @return this policy instance
+ */
+ public TableRetryPolicy withRetryPredicate(RetryPredicate retryPredicate) {
+ Preconditions.checkNotNull(retryPredicate);
+ this.retryPredicate = retryPredicate;
+ return this;
+ }
+
+ /**
+ * @return initial/fixed sleep time.
+ */
+ public Duration getSleepTime() {
+ return sleepTime;
+ }
+
+ /**
+ * @return lower sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}.
+ */
+ public Duration getRandomMin() {
+ return randomMin;
+ }
+
+ /**
+ * @return upper sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}.
+ */
+ public Duration getRandomMax() {
+ return randomMax;
+ }
+
+ /**
+ * @return exponential factor for exponential backoff.
+ */
+ public double getExponentialFactor() {
+ return exponentialFactor;
+ }
+
+ /**
+ * @return maximum sleepTime time for exponential backoff or null if {@code policyType} is not {@code EXPONENTIAL}.
+ */
+ public Duration getExponentialMaxSleep() {
+ return exponentialMaxSleep;
+ }
+
+ /**
+ * Introduce randomness to the sleepTime time.
+ * @return jitter to add on to each backoff or null if not set.
+ */
+ public Duration getJitter() {
+ return jitter;
+ }
+
+ /**
+ * Termination after a fix number of attempts.
+ * @return maximum number of attempts without success before giving up the operation or null if not set.
+ */
+ public Integer getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ /**
+ * Termination after a fixed duration.
+ * @return maximum duration without success before giving up the operation or null if not set.
+ */
+ public Duration getMaxDuration() {
+ return maxDuration;
+ }
+
+ /**
+ * @return type of the backoff.
+ */
+ public BackoffType getBackoffType() {
+ return backoffType;
+ }
+
+ /**
+ * @return Custom predicate for retriable exception identification or null if not specified.
+ */
+ public RetryPredicate getRetryPredicate() {
+ return retryPredicate;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
new file mode 100644
index 0000000..a7b66e5
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+
+import org.apache.samza.SamzaException;
+
+
+public final class SerdeUtils {
+ /**
+ * Helper method to serialize Java objects as Base64 strings
+ * @param name name of the object (for error reporting)
+ * @param object object to be serialized
+ * @return Base64 representation of the object
+ * @param <T> type of the object
+ */
+ public static <T> String serialize(String name, T object) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(object);
+ return Base64.getEncoder().encodeToString(baos.toByteArray());
+ } catch (IOException e) {
+ throw new SamzaException("Failed to serialize " + name, e);
+ }
+ }
+
+ /**
+ * Helper method to deserialize Java objects from Base64 strings
+ * @param name name of the object (for error reporting)
+ * @param strObject base64 string of the serialized object
+ * @return deserialized object instance
+ * @param <T> type of the object
+ */
+ public static <T> T deserialize(String name, String strObject) {
+ try {
+ byte [] bytes = Base64.getDecoder().decode(strObject);
+ return (T) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
+ } catch (Exception e) {
+ String errMsg = "Failed to deserialize " + name;
+ throw new SamzaException(errMsg, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
new file mode 100644
index 0000000..ea9acbd
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.util.RateLimiter;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestTableRateLimiter {
+ private static final String DEFAULT_TAG = "mytag";
+
+ public TableRateLimiter<String, String> getThrottler() {
+ return getThrottler(DEFAULT_TAG);
+ }
+
+ public TableRateLimiter<String, String> getThrottler(String tag) {
+ TableRateLimiter.CreditFunction<String, String> credFn =
+ (TableRateLimiter.CreditFunction<String, String>) (key, value) -> {
+ int credits = key == null ? 0 : 3;
+ credits += value == null ? 0 : 3;
+ return credits;
+ };
+ RateLimiter rateLimiter = mock(RateLimiter.class);
+ doReturn(Collections.singleton(DEFAULT_TAG)).when(rateLimiter).getSupportedTags();
+ TableRateLimiter<String, String> rateLimitHelper = new TableRateLimiter<>("foo", rateLimiter, credFn, tag);
+ Timer timer = mock(Timer.class);
+ rateLimitHelper.setTimerMetric(timer);
+ return rateLimitHelper;
+ }
+
+ @Test
+ public void testCreditKeyOnly() {
+ TableRateLimiter<String, String> rateLimitHelper = getThrottler();
+ Assert.assertEquals(3, rateLimitHelper.getCredits("abc", null));
+ }
+
+ @Test
+ public void testCreditKeyValue() {
+ TableRateLimiter<String, String> rateLimitHelper = getThrottler();
+ Assert.assertEquals(6, rateLimitHelper.getCredits("abc", "efg"));
+ }
+
+ @Test
+ public void testCreditKeys() {
+ TableRateLimiter<String, String> rateLimitHelper = getThrottler();
+ Assert.assertEquals(9, rateLimitHelper.getCredits(Arrays.asList("abc", "efg", "hij")));
+ }
+
+ @Test
+ public void testCreditEntries() {
+ TableRateLimiter<String, String> rateLimitHelper = getThrottler();
+ Assert.assertEquals(12, rateLimitHelper.getEntryCredits(
+ Arrays.asList(new Entry<>("abc", "efg"), new Entry<>("hij", "lmn"))));
+ }
+
+ @Test
+ public void testThrottle() {
+ TableRateLimiter<String, String> rateLimitHelper = getThrottler();
+ Timer timer = mock(Timer.class);
+ rateLimitHelper.setTimerMetric(timer);
+ rateLimitHelper.throttle("foo");
+ verify(rateLimitHelper.rateLimiter, times(1)).acquire(anyMap());
+ verify(timer, times(1)).update(anyLong());
+ }
+
+ @Test
+ public void testThrottleUnknownTag() {
+ TableRateLimiter<String, String> rateLimitHelper = getThrottler("unknown_tag");
+ rateLimitHelper.throttle("foo");
+ verify(rateLimitHelper.rateLimiter, times(0)).acquire(anyMap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
index 359c8f0..d3c283c 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
@@ -40,7 +40,7 @@ import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.OutputDescriptor;
import org.apache.samza.metrics.MetricsReporterFactory;
import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.BaseHybridTableDescriptor;
+import org.apache.samza.table.descriptors.HybridTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.operators.spec.InputOperatorSpec;
@@ -362,9 +362,9 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
|| tableDescriptors.get(tableId) == tableDescriptor,
String.format("Cannot add multiple table descriptors with the same tableId: %s", tableId));
- if (tableDescriptor instanceof BaseHybridTableDescriptor) {
+ if (tableDescriptor instanceof HybridTableDescriptor) {
List<? extends TableDescriptor> tableDescs =
- ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
+ ((HybridTableDescriptor) tableDescriptor).getTableDescriptors();
tableDescs.forEach(td -> addTableDescriptor(td));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
index 06f5606..3a3005a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+
/**
* A helper class for handling table configuration
*/
@@ -59,9 +60,9 @@ public class JavaTableConfig extends MapConfig {
}
/**
- * Get the {@link org.apache.samza.table.descriptors.TableProviderFactory} class for a table
+ * Get the {@link org.apache.samza.table.TableProviderFactory} class for a table
* @param tableId Id of the table
- * @return the {@link org.apache.samza.table.descriptors.TableProviderFactory} class name
+ * @return the {@link org.apache.samza.table.TableProviderFactory} class name
*/
public String getTableProviderFactory(String tableId) {
return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null);