You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/08 22:04:40 UTC

[4/4] samza git commit: SAMZA-1981: Consolidate table descriptors to samza-api

SAMZA-1981: Consolidate table descriptors to samza-api

As per subject, table descriptors moved are
 - LocalTableDescriptor
 - RemoteTableDescriptor
 - HybridTableDescriptor
 - GuavaCacheTableDescriptor
 - CachingTableDescriptor

Author: Wei Song <ws...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #799 from weisong44/SAMZA-1981


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3da75e61
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3da75e61
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3da75e61

Branch: refs/heads/master
Commit: 3da75e61ab61dad8aacd31dd38513e4edaa9d36a
Parents: 2159171
Author: Wei Song <ws...@linkedin.com>
Authored: Thu Nov 8 14:04:28 2018 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Thu Nov 8 14:04:28 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/table/TableProvider.java   |  61 ++++
 .../samza/table/TableProviderFactory.java       |  35 +++
 .../table/descriptors/BaseTableDescriptor.java  | 110 +++++++
 .../descriptors/CachingTableDescriptor.java     | 173 +++++++++++
 .../descriptors/GuavaCacheTableDescriptor.java  |  79 +++++
 .../descriptors/HybridTableDescriptor.java      |  48 +++
 .../table/descriptors/LocalTableDescriptor.java | 168 +++++++++++
 .../descriptors/RemoteTableDescriptor.java      | 296 +++++++++++++++++++
 .../samza/table/descriptors/TableProvider.java  |  62 ----
 .../table/descriptors/TableProviderFactory.java |  36 ---
 .../samza/table/remote/TableRateLimiter.java    | 167 +++++++++++
 .../samza/table/remote/TableReadFunction.java   | 111 +++++++
 .../samza/table/remote/TableWriteFunction.java  | 159 ++++++++++
 .../samza/table/retry/TableRetryPolicy.java     | 257 ++++++++++++++++
 .../apache/samza/table/utils/SerdeUtils.java    |  66 +++++
 .../table/remote/TestTableRateLimiter.java      | 103 +++++++
 .../descriptors/ApplicationDescriptorImpl.java  |   6 +-
 .../apache/samza/config/JavaTableConfig.java    |   5 +-
 .../apache/samza/table/BaseTableProvider.java   |  71 +++++
 .../samza/table/TableConfigGenerator.java       |   2 -
 .../org/apache/samza/table/TableManager.java    |   2 -
 .../table/caching/CachingTableProvider.java     |  98 ++++++
 .../caching/CachingTableProviderFactory.java    |  34 +++
 .../descriptors/CachingTableDescriptor.java     | 166 -----------
 .../descriptors/CachingTableProvider.java       | 105 -------
 .../CachingTableProviderFactory.java            |  34 ---
 .../caching/guava/GuavaCacheTableProvider.java  |  59 ++++
 .../guava/GuavaCacheTableProviderFactory.java   |  34 +++
 .../descriptors/GuavaCacheTableDescriptor.java  |  75 -----
 .../descriptors/GuavaCacheTableProvider.java    |  60 ----
 .../GuavaCacheTableProviderFactory.java         |  34 ---
 .../descriptors/BaseHybridTableDescriptor.java  |  48 ---
 .../table/descriptors/BaseTableDescriptor.java  | 110 -------
 .../samza/table/remote/RemoteTableProvider.java | 190 ++++++++++++
 .../remote/RemoteTableProviderFactory.java      |  38 +++
 .../samza/table/remote/TableRateLimiter.java    | 167 -----------
 .../samza/table/remote/TableReadFunction.java   | 111 -------
 .../samza/table/remote/TableWriteFunction.java  | 159 ----------
 .../descriptors/RemoteTableDescriptor.java      | 275 -----------------
 .../remote/descriptors/RemoteTableProvider.java | 202 -------------
 .../descriptors/RemoteTableProviderFactory.java |  38 ---
 .../table/retry/RetriableReadFunction.java      |   2 +-
 .../table/retry/RetriableWriteFunction.java     |   2 +-
 .../samza/table/retry/TableRetryPolicy.java     | 257 ----------------
 .../apache/samza/table/utils/SerdeUtils.java    |  66 -----
 .../utils/descriptors/BaseTableProvider.java    |  73 -----
 .../TestJobNodeConfigurationGenerator.java      |   4 +-
 .../apache/samza/table/TestTableManager.java    |   2 -
 .../samza/table/caching/TestCachingTable.java   |  18 +-
 .../table/remote/TestTableRateLimiter.java      | 103 -------
 .../descriptors/TestRemoteTableDescriptor.java  |  14 +-
 .../descriptors/InMemoryTableDescriptor.java    |   4 +-
 .../descriptors/InMemoryTableProvider.java      |   4 +-
 .../InMemoryTableProviderFactory.java           |   4 +-
 .../descriptors/TestInMemoryTableProvider.java  |   2 +-
 .../kv/descriptors/RocksDbTableDescriptor.java  |   3 +-
 .../kv/descriptors/RocksDbTableProvider.java    |   3 +-
 .../RocksDbTableProviderFactory.java            |   4 +-
 .../descriptors/TestRocksDbTableDescriptor.java |  10 +-
 .../descriptors/TestRocksDbTableProvider.java   |   2 +-
 .../samza/storage/kv/LocalTableProvider.java    | 148 ++++++++++
 .../descriptors/BaseLocalTableDescriptor.java   | 168 -----------
 .../kv/descriptors/BaseLocalTableProvider.java  | 149 ----------
 .../descriptors/TestBaseLocalTableProvider.java | 150 ----------
 .../kv/descriptors/TestLocalTableProvider.java  | 152 ++++++++++
 .../sql/testutil/TestIOResolverFactory.java     |   6 +-
 .../samza/test/table/TestRemoteTable.java       |   6 +-
 67 files changed, 2708 insertions(+), 2702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
new file mode 100644
index 0000000..350324c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+
+/**
+ * A table provider provides the implementation for a table. It ensures a table is
+ * properly constructed and also manages its lifecycle.
+ */
+@InterfaceStability.Unstable
+public interface TableProvider {
+  /**
+   * Initialize TableProvider with container and task context
+   * @param context context for the task
+   */
+  void init(Context context);
+
+  /**
+   * Get an instance of the table for read/write operations
+   * @return the underlying table
+   */
+  Table getTable();
+
+  /**
+   * Generate any configuration for this table, the generated configuration
+   * is used by Samza container to construct this table and any components
+   * necessary. Instead of manipulating the input parameters, this method
+   * should return the generated configuration.
+   *
+   * @param jobConfig the job config
+   * @param generatedConfig config generated by earlier processing, but has
+   *                        not yet been merged to job config
+   * @return configuration for this table
+   */
+  Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig);
+
+  /**
+   * Shutdown the underlying table
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
new file mode 100644
index 0000000..1bb0196
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * Factory of a table provider object
+ */
+@InterfaceStability.Unstable
+public interface TableProviderFactory {
+  /**
+   * Constructs an instances of the table provider based on a given table spec
+   * @param tableSpec the table spec
+   * @return the table provider
+   */
+  TableProvider getTableProvider(TableSpec tableSpec);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
new file mode 100644
index 0000000..246216b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Base class for all table descriptor implementations.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
+    implements TableDescriptor<K, V, D> {
+
+  protected final String tableId;
+
+  protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde());
+
+  protected final Map<String, String> config = new HashMap<>();
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   */
+  protected BaseTableDescriptor(String tableId) {
+    this.tableId = tableId;
+  }
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   * @param serde the serde for key and value
+   */
+  protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    this.tableId = tableId;
+    this.serde = serde;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public D withConfig(String key, String value) {
+    config.put(key, value);
+    return (D) this;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getTableId() {
+    return tableId;
+  }
+
+  /**
+   * Get the serde assigned to this {@link TableDescriptor}
+   *
+   * @return {@link KVSerde} used by this table
+   */
+  public KVSerde<K, V> getSerde() {
+    return serde;
+  }
+
+  /**
+   * Generate config for {@link TableSpec}; this method is used internally.
+   * @param tableSpecConfig configuration for the {@link TableSpec}
+   */
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+    tableSpecConfig.putAll(config);
+  }
+
+  /**
+   * Validate that this table descriptor is constructed properly; this method is used internally.
+   */
+  protected void validate() {
+  }
+
+  /**
+   * Create a {@link TableSpec} from this table descriptor; this method is used internally.
+   *
+   * @return the {@link TableSpec}
+   */
+  abstract public TableSpec getTableSpec();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
new file mode 100644
index 0000000..d6248c6
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.descriptors;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.table.TableSpec;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Table descriptor for a caching table.
+ * @param <K> type of the key in the cache
+ * @param <V> type of the value in the cache
+ */
+public class CachingTableDescriptor<K, V> extends HybridTableDescriptor<K, V, CachingTableDescriptor<K, V>> {
+
+  public static final String PROVIDER_FACTORY_CLASS_NAME = "org.apache.samza.table.caching.CachingTableProviderFactory";
+
+  public static final String REAL_TABLE_ID = "realTableId";
+  public static final String CACHE_TABLE_ID = "cacheTableId";
+  public static final String READ_TTL_MS = "readTtl";
+  public static final String WRITE_TTL_MS = "writeTtl";
+  public static final String CACHE_SIZE = "cacheSize";
+  public static final String WRITE_AROUND = "writeAround";
+
+  private Duration readTtl;
+  private Duration writeTtl;
+  private long cacheSize;
+  private TableDescriptor<K, V, ?> cache;
+  private TableDescriptor<K, V, ?> table;
+  private boolean isWriteAround;
+
+  /**
+   * Constructs a table descriptor instance with internal cache
+   *
+   * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
+   * @param table target table descriptor
+   */
+  public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table) {
+    super(tableId);
+    this.table = table;
+  }
+
+  /**
+   * Constructs a table descriptor instance and specify a cache (as Table descriptor)
+   * to be used for caching. Cache get is not synchronized with put for better parallelism
+   * in the read path of caching table. As such, cache table implementation is
+   * expected to be thread-safe for concurrent accesses.
+   *
+   * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
+   * @param table target table descriptor
+   * @param cache cache table descriptor
+   */
+  public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table,
+      TableDescriptor<K, V, ?> cache) {
+    this(tableId, table);
+    this.cache = cache;
+  }
+
+  @Override
+  public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors() {
+    return cache != null
+        ? Arrays.asList(cache, table)
+        : Arrays.asList(table);
+  }
+
+  @Override
+  public TableSpec getTableSpec() {
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    if (cache != null) {
+      tableSpecConfig.put(CACHE_TABLE_ID, ((BaseTableDescriptor) cache).getTableSpec().getId());
+    } else {
+      if (readTtl != null) {
+        tableSpecConfig.put(READ_TTL_MS, String.valueOf(readTtl.toMillis()));
+      }
+      if (writeTtl != null) {
+        tableSpecConfig.put(WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()));
+      }
+      if (cacheSize > 0) {
+        tableSpecConfig.put(CACHE_SIZE, String.valueOf(cacheSize));
+      }
+    }
+
+    tableSpecConfig.put(REAL_TABLE_ID, ((BaseTableDescriptor) table).getTableSpec().getId());
+    tableSpecConfig.put(WRITE_AROUND, String.valueOf(isWriteAround));
+
+    return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
+  }
+
+  /**
+   * Specify the TTL for each read access, ie. record is expired after
+   * the TTL duration since last read access of each key.
+   * @param readTtl read TTL
+   * @return this descriptor
+   */
+  public CachingTableDescriptor<K, V> withReadTtl(Duration readTtl) {
+    this.readTtl = readTtl;
+    return this;
+  }
+
+  /**
+   * Specify the TTL for each write access, ie. record is expired after
+   * the TTL duration since last write access of each key.
+   * @param writeTtl write TTL
+   * @return this descriptor
+   */
+  public CachingTableDescriptor<K, V> withWriteTtl(Duration writeTtl) {
+    this.writeTtl = writeTtl;
+    return this;
+  }
+
+  /**
+   * Specify the max cache size for size-based eviction.
+   * @param cacheSize max size of the cache
+   * @return this descriptor
+   */
+  public CachingTableDescriptor<K, V> withCacheSize(long cacheSize) {
+    this.cacheSize = cacheSize;
+    return this;
+  }
+
+  /**
+   * Specify if write-around policy should be used to bypass writing
+   * to cache for put operations. This is useful when put() is the
+   * dominant operation and get() has no locality with recent puts.
+   * @return this descriptor
+   */
+  public CachingTableDescriptor<K, V> withWriteAround() {
+    this.isWriteAround = true;
+    return this;
+  }
+
+  @Override
+  @VisibleForTesting
+  public void validate() {
+    super.validate();
+    Preconditions.checkNotNull(table, "Actual table is required.");
+    if (cache == null) {
+      Preconditions.checkNotNull(readTtl, "readTtl must be specified.");
+    } else {
+      Preconditions.checkArgument(readTtl == null && writeTtl == null && cacheSize == 0,
+          "Invalid to specify both {cache} and {readTtl|writeTtl|cacheSize} at the same time.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
new file mode 100644
index 0000000..192bd7e
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.utils.SerdeUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+
+
+/**
+ * Table descriptor for Guava-based caching table.
+ * @param <K> type of the key in the cache
+ * @param <V> type of the value in the cache
+ */
+public class GuavaCacheTableDescriptor<K, V> extends BaseTableDescriptor<K, V, GuavaCacheTableDescriptor<K, V>> {
+
+  public static final String PROVIDER_FACTORY_CLASS_NAME = "org.apache.samza.table.caching.guava.GuavaCacheTableProviderFactory";
+
+  public static final String GUAVA_CACHE = "guavaCache";
+
+  private Cache<K, V> cache;
+
+  /**
+   * {@inheritDoc}
+   */
+  public GuavaCacheTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  @Override
+  public TableSpec getTableSpec() {
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    tableSpecConfig.put(GUAVA_CACHE, SerdeUtils.serialize("Guava cache", cache));
+
+    return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
+  }
+
+  /**
+   * Specify a pre-configured Guava cache instance to be used for caching table.
+   * @param cache Guava cache instance
+   * @return this descriptor
+   */
+  public GuavaCacheTableDescriptor withCache(Cache<K, V> cache) {
+    this.cache = cache;
+    return this;
+  }
+
+  @Override
+  protected void validate() {
+    super.validate();
+    Preconditions.checkArgument(cache != null, "Must provide a Guava cache instance.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
new file mode 100644
index 0000000..ec8cc67
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import java.util.List;
+
+/**
+ * Base class for hybrid table descriptors. A hybrid table consists of one or more
+ * table descriptors, and it orchestrates operations between them to achieve more advanced
+ * functionality.
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ * @param <D> the type of this table descriptor
+ */
+abstract public class HybridTableDescriptor<K, V, D extends HybridTableDescriptor<K, V, D>>
+    extends BaseTableDescriptor<K, V, D> {
+
+  /**
+   * {@inheritDoc}
+   */
+  public HybridTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  /**
+   * Get tables contained within this table.
+   * @return list of tables
+   */
+  abstract public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
new file mode 100644
index 0000000..dfcaea4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.storage.SideInputsProcessor;
+
+
+/**
+ * Table descriptor for store backed tables.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<K, V, D>>
+    extends BaseTableDescriptor<K, V, D> {
+
+  static final public String INTERNAL_ENABLE_CHANGELOG = "internal.enable.changelog";
+  static final public String INTERNAL_CHANGELOG_STREAM = "internal.changelog.stream";
+  static final public String INTERNAL_CHANGELOG_REPLICATION_FACTOR = "internal.changelog.replication.factor";
+
+  protected List<String> sideInputs;
+  protected SideInputsProcessor sideInputsProcessor;
+  protected boolean enableChangelog;
+  protected String changelogStream;
+  protected Integer changelogReplicationFactor;
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   */
+  public LocalTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   * @param serde the serde for key and value
+   */
+  public LocalTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    super(tableId, serde);
+  }
+
+  public D withSideInputs(List<String> sideInputs) {
+    this.sideInputs = sideInputs;
+    // Disable changelog
+    this.enableChangelog = false;
+    this.changelogStream = null;
+    this.changelogReplicationFactor = null;
+    return (D) this;
+  }
+
+  public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
+    this.sideInputsProcessor = sideInputsProcessor;
+    return (D) this;
+  }
+
+  /**
+   * Enable changelog for this table, by default changelog is disabled. When the
+   * changelog stream name is not specified, it is automatically generated in
+   * the format {@literal [job-name]-[job-id]-table-[table-id]}.
+   * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
+   *
+   * @return this table descriptor instance
+   */
+  public D withChangelogEnabled() {
+    this.enableChangelog = true;
+    return (D) this;
+  }
+
+  /**
+   * Samza stores are local to a container. If the container fails, the contents of
+   * the store are lost. To prevent loss of data, you need to set this property to
+   * configure a changelog stream: Samza then ensures that writes to the store are
+   * replicated to this stream, and the store is restored from this stream after a
+   * failure. The value of this property is given in the form system-name.stream-name.
+   * The "system-name" part is optional. If it is omitted you must specify the system
+   * in <code>job.changelog.system</code> config. Any output stream can be used as
+   * changelog, but you must ensure that only one job ever writes to a given changelog
+   * stream (each instance of a job and each store needs its own changelog stream).
+   * <p>
+   * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
+   *
+   * @param changelogStream changelog stream name
+   * @return this table descriptor instance
+   */
+  public D withChangelogStream(String changelogStream) {
+    this.enableChangelog = true;
+    this.changelogStream = changelogStream;
+    return (D) this;
+  }
+
+  /**
+   * The property defines the number of replicas to use for the change log stream.
+   * <p>
+   * Default value is <code>stores.default.changelog.replication.factor</code>.
+   * <p>
+   * Refer to <code>stores.store-name.changelog.replication.factor</code> in Samza configuration guide
+   *
+   * @param replicationFactor replication factor
+   * @return this table descriptor instance
+   */
+  public D withChangelogReplicationFactor(int replicationFactor) {
+    this.enableChangelog = true;
+    this.changelogReplicationFactor = replicationFactor;
+    return (D) this;
+  }
+
+  @Override
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+    super.generateTableSpecConfig(tableSpecConfig);
+
+    tableSpecConfig.put(INTERNAL_ENABLE_CHANGELOG, String.valueOf(enableChangelog));
+    if (enableChangelog) {
+      if (changelogStream != null) {
+        tableSpecConfig.put(INTERNAL_CHANGELOG_STREAM, changelogStream);
+      }
+      if (changelogReplicationFactor != null) {
+        tableSpecConfig.put(INTERNAL_CHANGELOG_REPLICATION_FACTOR, String.valueOf(changelogReplicationFactor));
+      }
+    }
+  }
+
+  /**
+   * Validate that this table descriptor is constructed properly
+   */
+  @Override
+  protected void validate() {
+    super.validate();
+    if (sideInputs != null || sideInputsProcessor != null) {
+      Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null,
+          String.format("Invalid side input configuration for table: %s. " +
+              "Both side inputs and the processor must be provided", tableId));
+    }
+    if (!enableChangelog) {
+      Preconditions.checkState(changelogStream == null,
+          String.format("Invalid changelog configuration for table: %s. Changelog " +
+              "must be enabled, when changelog stream name is provided", tableId));
+      Preconditions.checkState(changelogReplicationFactor == null,
+          String.format("Invalid changelog configuration for table: %s. Changelog " +
+              "must be enabled, when changelog replication factor is provided", tableId));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
new file mode 100644
index 0000000..8b34ec9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.descriptors;
+
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.util.RateLimiter;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Table descriptor for remote store backed tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, RemoteTableDescriptor<K, V>> {
+
+  public static final String PROVIDER_FACTORY_CLASS_NAME = "org.apache.samza.table.remote.RemoteTableProviderFactory";
+
+  public static final String DEFAULT_RATE_LIMITER_CLASS_NAME = "org.apache.samza.util.EmbeddedTaggedRateLimiter";
+
+  /**
+   * Tag to be used for provision credits for rate limiting read operations from the remote table.
+   * Caller must pre-populate the credits with this tag when specifying a custom rate limiter instance
+   * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
+   * TableRateLimiter.CreditFunction)}
+   */
+  public static final String RL_READ_TAG = "readTag";
+
+  /**
+   * Tag to be used for provision credits for rate limiting write operations into the remote table.
+   * Caller can optionally populate the credits with this tag when specifying a custom rate limiter instance
+   * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
+   * TableRateLimiter.CreditFunction)} and it needs the write functionality.
+   */
+  public static final String RL_WRITE_TAG = "writeTag";
+
+  public static final String READ_FN = "io.read.func";
+  public static final String WRITE_FN = "io.write.func";
+  public static final String RATE_LIMITER = "io.ratelimiter";
+  public static final String READ_CREDIT_FN = "io.read.credit.func";
+  public static final String WRITE_CREDIT_FN = "io.write.credit.func";
+  public static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
+  public static final String READ_RETRY_POLICY = "io.read.retry.policy";
+  public static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
+
+  // Input support for a specific remote store (required)
+  private TableReadFunction<K, V> readFn;
+
+  // Output support for a specific remote store (optional)
+  private TableWriteFunction<K, V> writeFn;
+
+  // Rate limiter for client-side throttling; it is set by withRateLimiter()
+  private RateLimiter rateLimiter;
+
+  // Rates for constructing the default rate limiter when they are non-zero
+  private Map<String, Integer> tagCreditsMap = new HashMap<>();
+
+  private TableRateLimiter.CreditFunction<K, V> readCreditFn;
+  private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
+
+  private TableRetryPolicy readRetryPolicy;
+  private TableRetryPolicy writeRetryPolicy;
+
+  // By default execute future callbacks on the native client threads
+  // ie. no additional thread pool for callbacks.
+  private int asyncCallbackPoolSize = -1;
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   */
+  public RemoteTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   * @param serde the serde for key and value
+   */
+  public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    super(tableId, serde);
+  }
+
+  @Override
+  public TableSpec getTableSpec() {
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    // Serialize and store reader/writer functions
+    tableSpecConfig.put(READ_FN, SerdeUtils.serialize("read function", readFn));
+
+    if (writeFn != null) {
+      tableSpecConfig.put(WRITE_FN, SerdeUtils.serialize("write function", writeFn));
+    }
+
+    if (!tagCreditsMap.isEmpty()) {
+      RateLimiter defaultRateLimiter;
+      try {
+        Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
+        Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
+        defaultRateLimiter = ctor.newInstance(tagCreditsMap);
+      } catch (Exception ex) {
+        throw new SamzaException("Failed to create default rate limiter", ex);
+      }
+      tableSpecConfig.put(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter));
+    } else if (rateLimiter != null) {
+      tableSpecConfig.put(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
+    }
+
+    // Serialize the readCredit functions
+    if (readCreditFn != null) {
+      tableSpecConfig.put(READ_CREDIT_FN, SerdeUtils.serialize(
+          "read credit function", readCreditFn));
+    }
+    // Serialize the writeCredit functions
+    if (writeCreditFn != null) {
+      tableSpecConfig.put(WRITE_CREDIT_FN, SerdeUtils.serialize(
+          "write credit function", writeCreditFn));
+    }
+
+    if (readRetryPolicy != null) {
+      tableSpecConfig.put(READ_RETRY_POLICY, SerdeUtils.serialize(
+          "read retry policy", readRetryPolicy));
+    }
+
+    if (writeRetryPolicy != null) {
+      tableSpecConfig.put(WRITE_RETRY_POLICY, SerdeUtils.serialize(
+          "write retry policy", writeRetryPolicy));
+    }
+
+    tableSpecConfig.put(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
+
+    return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
+  }
+
+  /**
+   * Use specified TableReadFunction with remote table and a retry policy.
+   * @param readFn read function instance
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn) {
+    Preconditions.checkNotNull(readFn, "null read function");
+    this.readFn = readFn;
+    return this;
+  }
+
+  /**
+   * Use specified TableWriteFunction with remote table and a retry policy.
+   * @param writeFn write function instance
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    this.writeFn = writeFn;
+    return this;
+  }
+
+  /**
+   * Use specified TableReadFunction with remote table.
+   * @param readFn read function instance
+   * @param retryPolicy retry policy for the read function
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
+    Preconditions.checkNotNull(readFn, "null read function");
+    Preconditions.checkNotNull(retryPolicy, "null retry policy");
+    this.readFn = readFn;
+    this.readRetryPolicy = retryPolicy;
+    return this;
+  }
+
+  /**
+   * Use specified TableWriteFunction with remote table.
+   * @param writeFn write function instance
+   * @param retryPolicy retry policy for the write function
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(retryPolicy, "null retry policy");
+    this.writeFn = writeFn;
+    this.writeRetryPolicy = retryPolicy;
+    return this;
+  }
+
+  /**
+   * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
+   * of credits to be charged from the rate limiter for table read and write operations.
+   * This is an advanced API that provides greater flexibility to throttle each record in the table
+   * with different number of credits. For most common use-cases eg: limit the number of read/write
+   * operations, please instead use the {@link RemoteTableDescriptor#withReadRateLimit(int)} and
+   * {@link RemoteTableDescriptor#withWriteRateLimit(int)}.
+   *
+   * @param rateLimiter rate limiter instance to be used for throttling
+   * @param readCreditFn credit function for rate limiting read operations
+   * @param writeCreditFn credit function for rate limiting write operations
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withRateLimiter(RateLimiter rateLimiter,
+      TableRateLimiter.CreditFunction<K, V> readCreditFn,
+      TableRateLimiter.CreditFunction<K, V> writeCreditFn) {
+    Preconditions.checkNotNull(rateLimiter, "null read rate limiter");
+    this.rateLimiter = rateLimiter;
+    this.readCreditFn = readCreditFn;
+    this.writeCreditFn = writeCreditFn;
+    return this;
+  }
+
+  /**
+   * Specify the rate limit for table read operations. If the read rate limit is set with this method
+   * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
+   * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
+   * and vice versa.
+   * @param creditsPerSec rate limit for read operations; must be positive
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withReadRateLimit(int creditsPerSec) {
+    Preconditions.checkArgument(creditsPerSec > 0, "Max read rate must be a positive number.");
+    tagCreditsMap.put(RL_READ_TAG, creditsPerSec);
+    return this;
+  }
+
+  /**
+   * Specify the rate limit for table write operations. If the write rate limit is set with this method
+   * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
+   * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
+   * and vice versa.
+   * @param creditsPerSec rate limit for write operations; must be positive
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withWriteRateLimit(int creditsPerSec) {
+    Preconditions.checkArgument(creditsPerSec > 0, "Max write rate must be a positive number.");
+    tagCreditsMap.put(RL_WRITE_TAG, creditsPerSec);
+    return this;
+  }
+
+  /**
+   * Specify the size of the thread pool for the executor used to execute
+   * callbacks of CompletableFutures of async Table operations. By default, these
+   * futures are completed (called) by the threads of the native store client. Depending
+   * on the implementation of the native client, it may or may not allow executing long
+   * running operations in the callbacks. This config can be used to execute the callbacks
+   * from a separate executor to decouple from the native client. If configured, this
+   * thread pool is shared by all read and write operations.
+   * @param poolSize max number of threads in the executor for async callbacks
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withAsyncCallbackExecutorPoolSize(int poolSize) {
+    this.asyncCallbackPoolSize = poolSize;
+    return this;
+  }
+
+  @Override
+  protected void validate() {
+    super.validate();
+    Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
+    Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
+        "Only one of rateLimiter instance or read/write limits can be specified");
+    // Assume callback executor pool should have no more than 20 threads
+    Preconditions.checkArgument(asyncCallbackPoolSize <= 20,
+        "too many threads for async callback executor.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
deleted file mode 100644
index 8640b2a..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.descriptors;
-
-import java.util.Map;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.Table;
-
-/**
- * A table provider provides the implementation for a table. It ensures a table is
- * properly constructed and also manages its lifecycle.
- */
-@InterfaceStability.Unstable
-public interface TableProvider {
-  /**
-   * Initialize TableProvider with container and task context
-   * @param context context for the task
-   */
-  void init(Context context);
-
-  /**
-   * Get an instance of the table for read/write operations
-   * @return the underlying table
-   */
-  Table getTable();
-
-  /**
-   * Generate any configuration for this table, the generated configuration
-   * is used by Samza container to construct this table and any components
-   * necessary. Instead of manipulating the input parameters, this method
-   * should return the generated configuration.
-   *
-   * @param jobConfig the job config
-   * @param generatedConfig config generated by earlier processing, but has
-   *                        not yet been merged to job config
-   * @return configuration for this table
-   */
-  Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig);
-
-  /**
-   * Shutdown the underlying table
-   */
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
deleted file mode 100644
index 2f9a607..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.descriptors;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.table.TableSpec;
-
-
-/**
- * Factory of a table provider object
- */
-@InterfaceStability.Unstable
-public interface TableProviderFactory {
-  /**
-   * Constructs an instances of the table provider based on a given table spec
-   * @param tableSpec the table spec
-   * @return the table provider
-   */
-  TableProvider getTableProvider(TableSpec tableSpec);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
new file mode 100644
index 0000000..c67a648
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.util.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Helper class for remote table to throttle table IO requests with the configured rate limiter.
+ * For each request, the needed credits are calculated with the configured credit functions.
+ * The throttle methods are overloaded to support the possible CRUD operations.
+ *
+ * @param <K> type of the table key
+ * @param <V> type of the table record
+ */
+public class TableRateLimiter<K, V> {
+  private static final Logger LOG = LoggerFactory.getLogger(TableRateLimiter.class);
+
+  private final String tag;
+  private final boolean rateLimited;
+  private final CreditFunction<K, V> creditFn;
+
+  @VisibleForTesting
+  final RateLimiter rateLimiter;
+
+  private Timer waitTimeMetric;
+
+  /**
+   * Function interface for providing rate limiting credits for each table record.
+   * This interface allows callers to pass in lambda expressions which are otherwise
+   * non-serializable as-is.
+   * @param <K> the type of the key
+   * @param <V> the type of the value
+   */
+  @InterfaceStability.Unstable
+  public interface CreditFunction<K, V> extends Serializable {
+    /**
+     * Get the number of credits required for the {@code key} and {@code value} pair.
+     * @param key table key
+     * @param value table record
+     * @return number of credits
+     */
+    int getCredits(K key, V value);
+  }
+
+  /**
+   * @param tableId table id of the table to be rate limited
+   * @param rateLimiter actual rate limiter instance to be used
+   * @param creditFn function for deriving the credits for each request
+   * @param tag tag to be used with the rate limiter
+   */
+  public TableRateLimiter(String tableId, RateLimiter rateLimiter, CreditFunction<K, V> creditFn, String tag) {
+    this.rateLimiter = rateLimiter;
+    this.creditFn = creditFn;
+    this.tag = tag;
+    this.rateLimited = rateLimiter != null && rateLimiter.getSupportedTags().contains(tag);
+    LOG.info("Rate limiting is {} for {}", rateLimited ? "enabled" : "disabled", tableId);
+  }
+
+  /**
+   * Set up waitTimeMetric metric for latency reporting due to throttling.
+   * @param timer waitTimeMetric metric
+   */
+  public void setTimerMetric(Timer timer) {
+    Preconditions.checkNotNull(timer);
+    this.waitTimeMetric = timer;
+  }
+
+  int getCredits(K key, V value) {
+    return (creditFn == null) ? 1 : creditFn.getCredits(key, value);
+  }
+
+  int getCredits(Collection<K> keys) {
+    if (creditFn == null) {
+      return keys.size();
+    } else {
+      return keys.stream().mapToInt(k -> creditFn.getCredits(k, null)).sum();
+    }
+  }
+
+  int getEntryCredits(Collection<Entry<K, V>> entries) {
+    if (creditFn == null) {
+      return entries.size();
+    } else {
+      return entries.stream().mapToInt(e -> creditFn.getCredits(e.getKey(), e.getValue())).sum();
+    }
+  }
+
+  private void throttle(int credits) {
+    if (!rateLimited) {
+      return;
+    }
+
+    long startNs = System.nanoTime();
+    rateLimiter.acquire(Collections.singletonMap(tag, credits));
+    waitTimeMetric.update(System.nanoTime() - startNs);
+  }
+
+  /**
+   * Throttle a request with a key argument if necessary.
+   * @param key key used for the table request
+   */
+  public void throttle(K key) {
+    throttle(getCredits(key, null));
+  }
+
+  /**
+   * Throttle a request with both the key and value arguments if necessary.
+   * @param key key used for the table request
+   * @param value value used for the table request
+   */
+  public void throttle(K key, V value) {
+    throttle(getCredits(key, value));
+  }
+
+  /**
+   * Throttle a request with a collection of keys as the argument if necessary.
+   * @param keys collection of keys used for the table request
+   */
+  public void throttle(Collection<K> keys) {
+    throttle(getCredits(keys));
+  }
+
+  /**
+   * Throttle a request with a collection of table records as the argument if necessary.
+   * @param records collection of records used for the table request
+   */
+  public void throttleRecords(Collection<Entry<K, V>> records) {
+    throttle(getEntryCredits(records));
+  }
+
+  /**
+   * @return whether rate limiting is enabled for the associated table
+   */
+  public boolean isRateLimited() {
+    return rateLimited;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
new file mode 100644
index 0000000..d54f83d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.InitableFunction;
+
+import com.google.common.collect.Iterables;
+
+
+/**
+ * A function object to be used with a remote readable table implementation. It encapsulates the functionality
+ * of reading table record(s) for a provided set of key(s).
+ *
+ * <p> Instances of {@link TableReadFunction} are meant to be serializable. ie. any non-serializable state
+ * (eg: network sockets) should be marked as transient and recreated inside readObject().
+ *
+ * <p> Implementations are expected to be thread-safe.
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+@InterfaceStability.Unstable
+public interface TableReadFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+  /**
+   * Fetch single table record for a specified {@code key}. This method must be thread-safe.
+   * The default implementation calls getAsync and blocks on the completion afterwards.
+   * @param key key for the table record
+   * @return table record for the specified {@code key}
+   */
+  default V get(K key) {
+    try {
+      return getAsync(key).get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new SamzaException("GET failed for " + key, e);
+    }
+  }
+
+  /**
+   * Asynchronously fetch single table record for a specified {@code key}. This method must be thread-safe.
+   * @param key key for the table record
+   * @return CompletableFuture for the get request
+   */
+  CompletableFuture<V> getAsync(K key);
+
+  /**
+   * Fetch the table {@code records} for specified {@code keys}. This method must be thread-safe.
+   * The default implementation calls getAllAsync and blocks on the completion afterwards.
+   * @param keys keys for the table records
+   * @return all records for the specified keys.
+   */
+  default Map<K, V> getAll(Collection<K> keys) {
+    try {
+      return getAllAsync(keys).get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new SamzaException("GET_ALL failed for " + keys, e);
+    }
+  }
+
+  /**
+   * Asynchronously fetch the table {@code records} for specified {@code keys}. This method must be thread-safe.
+   * The default implementation calls getAsync for each key and return a combined future.
+   * @param keys keys for the table records
+   * @return CompletableFuture for the get request
+   */
+  default CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
+    Map<K, CompletableFuture<V>> getFutures =  keys.stream().collect(
+        Collectors.toMap(k -> k, k -> getAsync(k)));
+
+    return CompletableFuture.allOf(
+        Iterables.toArray(getFutures.values(), CompletableFuture.class))
+        .thenApply(future ->
+          getFutures.entrySet()
+            .stream()
+            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().join())));
+  }
+
+  /**
+   * Determine whether the current operation can be retried with the last thrown exception.
+   * @param exception exception thrown by a table operation
+   * @return whether the operation can be retried
+   */
+  boolean isRetriable(Throwable exception);
+
+  // optionally implement readObject() to initialize transient states
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
new file mode 100644
index 0000000..1e3dc4c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.InitableFunction;
+import org.apache.samza.storage.kv.Entry;
+
+import com.google.common.collect.Iterables;
+
+
+/**
+ * A function object to be used with a remote read/write table implementation. It encapsulates the functionality
+ * of writing table record(s) for a provided set of key(s) to the store.
+ *
+ * <p> Instances of {@link TableWriteFunction} are meant to be serializable. ie. any non-serializable state
+ * (eg: network sockets) should be marked as transient and recreated inside readObject().
+ *
+ * <p> Implementations are expected to be thread-safe.
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+@InterfaceStability.Unstable
+public interface TableWriteFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+  /**
+   * Store single table {@code record} with specified {@code key}. This method must be thread-safe.
+   * The default implementation calls putAsync and blocks on the completion afterwards.
+   *
+   * @param key key for the table record
+   * @param record table record to be written
+   */
+  default void put(K key, V record) {
+    try {
+      putAsync(key, record).get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new SamzaException("PUT failed for " + key, e);
+    }
+  }
+
+  /**
+   * Asynchronously store single table {@code record} with specified {@code key}. This method must be thread-safe.
+   * @param key key for the table record
+   * @param record table record to be written
+   * @return CompletableFuture for the put request
+   */
+  CompletableFuture<Void> putAsync(K key, V record);
+
+  /**
+   * Store the table {@code records} with specified {@code keys}. This method must be thread-safe.
+   * The default implementation calls putAllAsync and blocks on the completion afterwards.
+   * @param records table records to be written
+   */
+  default void putAll(List<Entry<K, V>> records) {
+    try {
+      putAllAsync(records).get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new SamzaException("PUT_ALL failed for " + records, e);
+    }
+  }
+
+  /**
+   * Asynchronously store the table {@code records} with specified {@code keys}. This method must be thread-safe.
+   * The default implementation calls putAsync for each entry and return a combined future.
+   * @param records table records to be written
+   * @return CompletableFuture for the put request
+   */
+  default CompletableFuture<Void> putAllAsync(Collection<Entry<K, V>> records) {
+    List<CompletableFuture<Void>> putFutures =
+        records.stream().map(e -> putAsync(e.getKey(), e.getValue())).collect(Collectors.toList());
+    return CompletableFuture.allOf(Iterables.toArray(putFutures, CompletableFuture.class));
+  }
+
+  /**
+   * Delete the {@code record} with specified {@code key} from the remote store.
+   * The default implementation calls deleteAsync and blocks on the completion afterwards.
+   * @param key key to the table record to be deleted
+   */
+  default void delete(K key) {
+    try {
+      deleteAsync(key).get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new SamzaException("DELETE failed for " + key, e);
+    }
+  }
+
+  /**
+   * Asynchronously delete the {@code record} with specified {@code key} from the remote store
+   * @param key key to the table record to be deleted
+   * @return CompletableFuture for the delete request
+   */
+  CompletableFuture<Void> deleteAsync(K key);
+
+  /**
+   * Delete all {@code records} with the specified {@code keys} from the remote store
+   * The default implementation calls deleteAllAsync and blocks on the completion afterwards.
+   * @param keys keys for the table records to be written
+   */
+  default void deleteAll(Collection<K> keys) {
+    try {
+      deleteAllAsync(keys).get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new SamzaException("DELETE failed for " + keys, e);
+    }
+  }
+
+  /**
+   * Asynchronously delete all {@code records} with the specified {@code keys} from the remote store.
+   * The default implementation calls deleteAsync for each key and return a combined future.
+   *
+   * @param keys keys for the table records to be written
+   * @return CompletableFuture for the deleteAll request
+   */
+  default CompletableFuture<Void> deleteAllAsync(Collection<K> keys) {
+    List<CompletableFuture<Void>> deleteFutures =
+        keys.stream().map(this::deleteAsync).collect(Collectors.toList());
+    return CompletableFuture.allOf(Iterables.toArray(deleteFutures, CompletableFuture.class));
+  }
+
+  /**
+   * Determine whether the current operation can be retried with the last thrown exception.
+   * @param exception exception thrown by a table operation
+   * @return whether the operation can be retried
+   */
+  boolean isRetriable(Throwable exception);
+
+  /**
+   * Flush the remote store (optional)
+   */
+  default void flush() {
+  }
+
+  // optionally implement readObject() to initialize transient states
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
new file mode 100644
index 0000000..162eb07
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Common retry policy parameters for table IO. This serves as an abstraction on top of
+ * retry libraries. This common policy supports below features:
+ *  - backoff modes: fixed, random, exponential
+ *  - termination modes: by attempts, by duration
+ *  - jitter
+ *
+ * Retry libraries can implement a subset or all features as described by this common policy.
+ */
+public class TableRetryPolicy implements Serializable {
+  enum BackoffType {
+    /**
+     * No backoff in between two retry attempts.
+     */
+    NONE,
+
+    /**
+     * Backoff by a fixed duration {@code sleepTime}.
+     */
+    FIXED,
+
+    /**
+     * Backoff by a randomly selected duration between {@code minSleep} and {@code maxSleep}.
+     */
+    RANDOM,
+
+    /**
+     * Backoff by exponentially increasing durations by {@code exponentialFactor} starting from {@code sleepTime}.
+     */
+    EXPONENTIAL
+  }
+
+  // Backoff parameters
+  private Duration sleepTime;
+  private Duration randomMin;
+  private Duration randomMax;
+  private double exponentialFactor;
+  private Duration exponentialMaxSleep;
+  private Duration jitter;
+
+  // By default no early termination
+  private Integer maxAttempts = null;
+  private Duration maxDuration = null;
+
+  // By default no backoff during retries
+  private BackoffType backoffType = BackoffType.NONE;
+
+  /**
+   * Serializable adapter interface for {@link java.util.function.Predicate}.
+   * This is needed because TableRetryPolicy needs to be serializable as part of the
+   * table config whereas {@link java.util.function.Predicate} is not serializable.
+   */
+  public interface RetryPredicate extends Predicate<Throwable>, Serializable {
+  }
+
+  // By default no custom retry predicate so retry decision is made solely by the table functions
+  private RetryPredicate retryPredicate = (ex) -> false;
+
+  /**
+   * Set the sleepTime time for the fixed backoff policy.
+   * @param sleepTime sleepTime time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withFixedBackoff(Duration sleepTime) {
+    Preconditions.checkNotNull(sleepTime);
+    this.sleepTime = sleepTime;
+    this.backoffType = BackoffType.FIXED;
+    return this;
+  }
+
+  /**
+   * Set the sleepTime time for the random backoff policy. The actual sleepTime time
+   * before each attempt is randomly selected between {@code [minSleep, maxSleep]}
+   * @param minSleep lower bound sleepTime time
+   * @param maxSleep upper bound sleepTime time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withRandomBackoff(Duration minSleep, Duration maxSleep) {
+    Preconditions.checkNotNull(minSleep);
+    Preconditions.checkNotNull(maxSleep);
+    this.randomMin = minSleep;
+    this.randomMax = maxSleep;
+    this.backoffType = BackoffType.RANDOM;
+    return this;
+  }
+
+  /**
+   * Set the parameters for the exponential backoff policy. The actual sleepTime time
+   * is exponentially incremented up to the {@code maxSleep} and multiplying
+   * successive delays by the {@code factor}.
+   * @param sleepTime initial sleepTime time
+   * @param maxSleep upper bound sleepTime time
+   * @param factor exponential factor for backoff
+   * @return this policy instance
+   */
+  public TableRetryPolicy withExponentialBackoff(Duration sleepTime, Duration maxSleep, double factor) {
+    Preconditions.checkNotNull(sleepTime);
+    Preconditions.checkNotNull(maxSleep);
+    this.sleepTime = sleepTime;
+    this.exponentialMaxSleep = maxSleep;
+    this.exponentialFactor = factor;
+    this.backoffType = BackoffType.EXPONENTIAL;
+    return this;
+  }
+
+  /**
+   * Set the jitter for the backoff policy to provide additional randomness.
+   * If this is set, a random value between {@code [0, jitter]} will be added
+   * to each sleepTime time. This applies to {@code FIXED} and {@code EXPONENTIAL}
+   * modes only.
+   * @param jitter initial sleepTime time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withJitter(Duration jitter) {
+    Preconditions.checkNotNull(jitter);
+    if (backoffType != BackoffType.RANDOM) {
+      this.jitter = jitter;
+    }
+    return this;
+  }
+
+  /**
+   * Set maximum number of attempts before terminating the operation.
+   * @param maxAttempts number of attempts
+   * @return this policy instance
+   */
+  public TableRetryPolicy withStopAfterAttempts(int maxAttempts) {
+    Preconditions.checkArgument(maxAttempts >= 0);
+    this.maxAttempts = maxAttempts;
+    return this;
+  }
+
+  /**
+   * Set maximum total delay (sleepTime + execution) before terminating the operation.
+   * @param maxDelay delay time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withStopAfterDelay(Duration maxDelay) {
+    Preconditions.checkNotNull(maxDelay);
+    this.maxDuration = maxDelay;
+    return this;
+  }
+
+  /**
+   * Set the predicate to use for identifying retriable exceptions. If specified, table
+   * retry logic will consult both such predicate and table function and retry will be
+   * attempted if either option returns true.
+   * @param retryPredicate predicate for retriable exception identification
+   * @return this policy instance
+   */
+  public TableRetryPolicy withRetryPredicate(RetryPredicate retryPredicate) {
+    Preconditions.checkNotNull(retryPredicate);
+    this.retryPredicate = retryPredicate;
+    return this;
+  }
+
+  /**
+   * @return initial/fixed sleep time.
+   */
+  public Duration getSleepTime() {
+    return sleepTime;
+  }
+
+  /**
+   * @return lower sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}.
+   */
+  public Duration getRandomMin() {
+    return randomMin;
+  }
+
+  /**
+   * @return upper sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}.
+   */
+  public Duration getRandomMax() {
+    return randomMax;
+  }
+
+  /**
+   * @return exponential factor for exponential backoff.
+   */
+  public double getExponentialFactor() {
+    return exponentialFactor;
+  }
+
+  /**
+   * @return maximum sleepTime time for exponential backoff or null if {@code policyType} is not {@code EXPONENTIAL}.
+   */
+  public Duration getExponentialMaxSleep() {
+    return exponentialMaxSleep;
+  }
+
+  /**
+   * Introduce randomness to the sleepTime time.
+   * @return jitter to add on to each backoff or null if not set.
+   */
+  public Duration getJitter() {
+    return jitter;
+  }
+
+  /**
+   * Termination after a fix number of attempts.
+   * @return maximum number of attempts without success before giving up the operation or null if not set.
+   */
+  public Integer getMaxAttempts() {
+    return maxAttempts;
+  }
+
+  /**
+   * Termination after a fixed duration.
+   * @return maximum duration without success before giving up the operation or null if not set.
+   */
+  public Duration getMaxDuration() {
+    return maxDuration;
+  }
+
+  /**
+   * @return type of the backoff.
+   */
+  public BackoffType getBackoffType() {
+    return backoffType;
+  }
+
+  /**
+   * @return Custom predicate for retriable exception identification or null if not specified.
+   */
+  public RetryPredicate getRetryPredicate() {
+    return retryPredicate;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
new file mode 100644
index 0000000..a7b66e5
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+
+import org.apache.samza.SamzaException;
+
+
+public final class SerdeUtils {
+  /**
+   * Helper method to serialize Java objects as Base64 strings
+   * @param name name of the object (for error reporting)
+   * @param object object to be serialized
+   * @return Base64 representation of the object
+   * @param <T> type of the object
+   */
+  public static <T> String serialize(String name, T object) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+      oos.writeObject(object);
+      return Base64.getEncoder().encodeToString(baos.toByteArray());
+    } catch (IOException e) {
+      throw new SamzaException("Failed to serialize " + name, e);
+    }
+  }
+
+  /**
+   * Helper method to deserialize Java objects from Base64 strings
+   * @param name name of the object (for error reporting)
+   * @param strObject base64 string of the serialized object
+   * @return deserialized object instance
+   * @param <T> type of the object
+   */
+  public static <T> T deserialize(String name, String strObject) {
+    try {
+      byte [] bytes = Base64.getDecoder().decode(strObject);
+      return (T) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
+    } catch (Exception e) {
+      String errMsg = "Failed to deserialize " + name;
+      throw new SamzaException(errMsg, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
new file mode 100644
index 0000000..ea9acbd
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.util.RateLimiter;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestTableRateLimiter {
+  private static final String DEFAULT_TAG = "mytag";
+
+  public TableRateLimiter<String, String> getThrottler() {
+    return getThrottler(DEFAULT_TAG);
+  }
+
+  public TableRateLimiter<String, String> getThrottler(String tag) {
+    TableRateLimiter.CreditFunction<String, String> credFn =
+        (TableRateLimiter.CreditFunction<String, String>) (key, value) -> {
+      int credits = key == null ? 0 : 3;
+      credits += value == null ? 0 : 3;
+      return credits;
+    };
+    RateLimiter rateLimiter = mock(RateLimiter.class);
+    doReturn(Collections.singleton(DEFAULT_TAG)).when(rateLimiter).getSupportedTags();
+    TableRateLimiter<String, String> rateLimitHelper = new TableRateLimiter<>("foo", rateLimiter, credFn, tag);
+    Timer timer = mock(Timer.class);
+    rateLimitHelper.setTimerMetric(timer);
+    return rateLimitHelper;
+  }
+
+  @Test
+  public void testCreditKeyOnly() {
+    TableRateLimiter<String, String> rateLimitHelper = getThrottler();
+    Assert.assertEquals(3, rateLimitHelper.getCredits("abc", null));
+  }
+
+  @Test
+  public void testCreditKeyValue() {
+    TableRateLimiter<String, String> rateLimitHelper = getThrottler();
+    Assert.assertEquals(6, rateLimitHelper.getCredits("abc", "efg"));
+  }
+
+  @Test
+  public void testCreditKeys() {
+    TableRateLimiter<String, String> rateLimitHelper = getThrottler();
+    Assert.assertEquals(9, rateLimitHelper.getCredits(Arrays.asList("abc", "efg", "hij")));
+  }
+
+  @Test
+  public void testCreditEntries() {
+    TableRateLimiter<String, String> rateLimitHelper = getThrottler();
+    Assert.assertEquals(12, rateLimitHelper.getEntryCredits(
+        Arrays.asList(new Entry<>("abc", "efg"), new Entry<>("hij", "lmn"))));
+  }
+
+  @Test
+  public void testThrottle() {
+    TableRateLimiter<String, String> rateLimitHelper = getThrottler();
+    Timer timer = mock(Timer.class);
+    rateLimitHelper.setTimerMetric(timer);
+    rateLimitHelper.throttle("foo");
+    verify(rateLimitHelper.rateLimiter, times(1)).acquire(anyMap());
+    verify(timer, times(1)).update(anyLong());
+  }
+
+  @Test
+  public void testThrottleUnknownTag() {
+    TableRateLimiter<String, String> rateLimitHelper = getThrottler("unknown_tag");
+    rateLimitHelper.throttle("foo");
+    verify(rateLimitHelper.rateLimiter, times(0)).acquire(anyMap());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
index 359c8f0..d3c283c 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
@@ -40,7 +40,7 @@ import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.OutputDescriptor;
 import org.apache.samza.metrics.MetricsReporterFactory;
 import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.BaseHybridTableDescriptor;
+import org.apache.samza.table.descriptors.HybridTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.operators.spec.InputOperatorSpec;
@@ -362,9 +362,9 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
         || tableDescriptors.get(tableId) == tableDescriptor,
         String.format("Cannot add multiple table descriptors with the same tableId: %s", tableId));
 
-    if (tableDescriptor instanceof BaseHybridTableDescriptor) {
+    if (tableDescriptor instanceof HybridTableDescriptor) {
       List<? extends TableDescriptor> tableDescs =
-          ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
+          ((HybridTableDescriptor) tableDescriptor).getTableDescriptors();
       tableDescs.forEach(td -> addTableDescriptor(td));
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
index 06f5606..3a3005a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+
 /**
  * A helper class for handling table configuration
  */
@@ -59,9 +60,9 @@ public class JavaTableConfig extends MapConfig {
   }
 
   /**
-   * Get the {@link org.apache.samza.table.descriptors.TableProviderFactory} class for a table
+   * Get the {@link org.apache.samza.table.TableProviderFactory} class for a table
    * @param tableId Id of the table
-   * @return the {@link org.apache.samza.table.descriptors.TableProviderFactory} class name
+   * @return the {@link org.apache.samza.table.TableProviderFactory} class name
    */
   public String getTableProviderFactory(String tableId) {
     return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null);