You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2019/01/17 19:56:13 UTC

[2/2] samza git commit: y

y

Currently different features such as rate limiting, retries, etc. are implemented together in remote table, the implementation will become more and more complex and error prone as we add more functionality. It will be necessary to separate theses features into their own class/module. This is also a necessary step as we move on to add batching support.

 - Converted to a composition-based implementation
 - The remote table would only provide core functionality and basic metrics
 - Splitted ReadWriteTable into sync and async part (AsyncReadWriteTable)
 - Introduced AsyncRateLimitedTable, AsyncRetriableTable

Author: Wei Song <ws...@linkedin.com>

Reviewers: Jagadish Venkatraman <jv...@linkedin.com>

Closes #880 from weisong44/SAMZA-2066


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4289ca91
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4289ca91
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4289ca91

Branch: refs/heads/master
Commit: 4289ca91d674fe69fa3f4ccd79dbf130b02fb6ef
Parents: 94ccc32
Author: Wei Song <ws...@linkedin.com>
Authored: Thu Jan 17 11:56:05 2019 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Thu Jan 17 11:56:05 2019 -0800

----------------------------------------------------------------------
 .../apache/samza/table/AsyncReadWriteTable.java | 108 +++++
 .../org/apache/samza/table/ReadWriteTable.java  |  81 +---
 .../samza/table/remote/TableRateLimiter.java    |  20 +-
 .../table/remote/TestTableRateLimiter.java      |   9 +-
 .../apache/samza/table/BaseReadWriteTable.java  |   9 +
 .../table/ratelimit/AsyncRateLimitedTable.java  | 148 +++++++
 .../samza/table/remote/AsyncRemoteTable.java    | 104 +++++
 .../apache/samza/table/remote/RemoteTable.java  | 268 ++++---------
 .../samza/table/remote/RemoteTableProvider.java |  55 +--
 .../samza/table/retry/AsyncRetriableTable.java  | 158 ++++++++
 .../org/apache/samza/testUtils/TestUtils.java   |  57 +++
 .../samza/table/caching/TestCachingTable.java   |  12 +-
 .../ratelimit/TestAsyncRateLimitedTable.java    | 160 ++++++++
 .../table/remote/TestAsyncRemoteTable.java      | 141 +++++++
 .../samza/table/remote/TestRemoteTable.java     |  59 ++-
 .../descriptors/TestRemoteTableDescriptor.java  |  27 +-
 .../table/retry/TestAsyncRetriableTable.java    | 392 +++++++++++++++++++
 .../retry/TestRetriableTableFunctions.java      |   2 +-
 .../org/apache/samza/storage/kv/LocalTable.java |   4 -
 .../test/table/TestRemoteTableEndToEnd.java     |  16 +-
 20 files changed, 1459 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java b/samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
new file mode 100644
index 0000000..dc976b5
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.context.Context;
+import org.apache.samza.storage.kv.Entry;
+
+
+/**
+ * A table that supports asynchronous get, put and delete by one or more keys
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public interface AsyncReadWriteTable<K, V> extends Table {
+  /**
+   * Asynchronously gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return completableFuture for the requested value
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  CompletableFuture<V> getAsync(K key);
+
+  /**
+   * Asynchronously gets the values with which the specified {@code keys} are associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return completableFuture for the requested entries
+   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+   */
+  CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
+
+  /**
+   * Asynchronously updates the mapping of the specified key-value pair;
+   * Associates the specified {@code key} with the specified {@code value}.
+   * The key is deleted from the table if value is {@code null}.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param value the value with which the specified {@code key} is to be associated.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> putAsync(K key, V value);
+
+  /**
+   * Asynchronously updates the mappings of the specified key-value {@code entries}.
+   * A key is deleted from the table if its corresponding value is {@code null}.
+   *
+   * @param entries the updated mappings to put into this table.
+   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries);
+
+  /**
+   * Asynchronously deletes the mapping for the specified {@code key} from this table (if such mapping exists).
+   * @param key the key for which the mapping is to be deleted.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> deleteAsync(K key);
+
+  /**
+   * Asynchronously deletes the mappings for the specified {@code keys} from this table.
+   * @param keys the keys for which the mappings are to be deleted.
+   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> deleteAllAsync(List<K> keys);
+
+  /**
+   * Initializes the table during container initialization.
+   * Guaranteed to be invoked as the first operation on the table.
+   * @param context {@link Context} corresponding to this table
+   */
+  default void init(Context context) {
+  }
+
+  /**
+   * Flushes the underlying store of this table, if applicable.
+   */
+  void flush();
+
+  /**
+   * Close the table and release any resources acquired
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
index ffb87a4..a7dad8f 100644
--- a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
+++ b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
@@ -20,29 +20,18 @@ package org.apache.samza.table;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
 
 /**
- *
- * A table that supports get, put and delete by one or more keys
+ * A table that supports synchronous and asynchronousget, put and delete by one or more keys
  *
  * @param <K> the type of the key in this table
  * @param <V> the type of the value in this table
  */
 @InterfaceStability.Unstable
-public interface ReadWriteTable<K, V> extends Table {
-
-  /**
-   * Initializes the table during container initialization.
-   * Guaranteed to be invoked as the first operation on the table.
-   * @param context {@link Context} corresponding to this table
-   */
-  default void init(Context context) {
-  }
+public interface ReadWriteTable<K, V> extends AsyncReadWriteTable<K, V> {
 
   /**
    * Gets the value associated with the specified {@code key}.
@@ -54,15 +43,6 @@ public interface ReadWriteTable<K, V> extends Table {
   V get(K key);
 
   /**
-   * Asynchronously gets the value associated with the specified {@code key}.
-   *
-   * @param key the key with which the associated value is to be fetched.
-   * @return completableFuture for the requested value
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   */
-  CompletableFuture<V> getAsync(K key);
-
-  /**
    * Gets the values with which the specified {@code keys} are associated.
    *
    * @param keys the keys with which the associated values are to be fetched.
@@ -72,15 +52,6 @@ public interface ReadWriteTable<K, V> extends Table {
   Map<K, V> getAll(List<K> keys);
 
   /**
-   * Asynchronously gets the values with which the specified {@code keys} are associated.
-   *
-   * @param keys the keys with which the associated values are to be fetched.
-   * @return completableFuture for the requested entries
-   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-   */
-  CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
-
-  /**
    * Updates the mapping of the specified key-value pair;
    * Associates the specified {@code key} with the specified {@code value}.
    *
@@ -93,18 +64,6 @@ public interface ReadWriteTable<K, V> extends Table {
   void put(K key, V value);
 
   /**
-   * Asynchronously updates the mapping of the specified key-value pair;
-   * Associates the specified {@code key} with the specified {@code value}.
-   * The key is deleted from the table if value is {@code null}.
-   *
-   * @param key the key with which the specified {@code value} is to be associated.
-   * @param value the value with which the specified {@code key} is to be associated.
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   * @return CompletableFuture for the operation
-   */
-  CompletableFuture<Void> putAsync(K key, V value);
-
-  /**
    * Updates the mappings of the specified key-value {@code entries}.
    *
    * A key is deleted from the table if its corresponding value is {@code null}.
@@ -115,16 +74,6 @@ public interface ReadWriteTable<K, V> extends Table {
   void putAll(List<Entry<K, V>> entries);
 
   /**
-   * Asynchronously updates the mappings of the specified key-value {@code entries}.
-   * A key is deleted from the table if its corresponding value is {@code null}.
-   *
-   * @param entries the updated mappings to put into this table.
-   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key.
-   * @return CompletableFuture for the operation
-   */
-  CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries);
-
-  /**
    * Deletes the mapping for the specified {@code key} from this table (if such mapping exists).
    *
    * @param key the key for which the mapping is to be deleted.
@@ -133,36 +82,10 @@ public interface ReadWriteTable<K, V> extends Table {
   void delete(K key);
 
   /**
-   * Asynchronously deletes the mapping for the specified {@code key} from this table (if such mapping exists).
-   * @param key the key for which the mapping is to be deleted.
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   * @return CompletableFuture for the operation
-   */
-  CompletableFuture<Void> deleteAsync(K key);
-
-  /**
    * Deletes the mappings for the specified {@code keys} from this table.
    *
    * @param keys the keys for which the mappings are to be deleted.
    * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
    */
   void deleteAll(List<K> keys);
-
-  /**
-   * Asynchronously deletes the mappings for the specified {@code keys} from this table.
-   * @param keys the keys for which the mappings are to be deleted.
-   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-   * @return CompletableFuture for the operation
-   */
-  CompletableFuture<Void> deleteAllAsync(List<K> keys);
-
-  /**
-   * Flushes the underlying store of this table, if applicable.
-   */
-  void flush();
-
-  /**
-   * Close the table and release any resources acquired
-   */
-  void close();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
index 37d6385..0758dd2 100644
--- a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
@@ -27,8 +27,6 @@ import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.util.RateLimiter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -43,10 +41,8 @@ import com.google.common.base.Preconditions;
  * @param <V> type of the table record
  */
 public class TableRateLimiter<K, V> {
-  private static final Logger LOG = LoggerFactory.getLogger(TableRateLimiter.class);
 
   private final String tag;
-  private final boolean rateLimited;
   private final CreditFunction<K, V> creditFn;
 
   @VisibleForTesting
@@ -79,11 +75,12 @@ public class TableRateLimiter<K, V> {
    * @param tag tag to be used with the rate limiter
    */
   public TableRateLimiter(String tableId, RateLimiter rateLimiter, CreditFunction<K, V> creditFn, String tag) {
+    Preconditions.checkNotNull(rateLimiter);
+    Preconditions.checkArgument(rateLimiter.getSupportedTags().contains(tag),
+        String.format("Rate limiter for table %s doesn't support %s", tableId, tag));
     this.rateLimiter = rateLimiter;
     this.creditFn = creditFn;
     this.tag = tag;
-    this.rateLimited = rateLimiter != null && rateLimiter.getSupportedTags().contains(tag);
-    LOG.info("Rate limiting is {} for {}", rateLimited ? "enabled" : "disabled", tableId);
   }
 
   /**
@@ -116,10 +113,6 @@ public class TableRateLimiter<K, V> {
   }
 
   private void throttle(int credits) {
-    if (!rateLimited) {
-      return;
-    }
-
     long startNs = System.nanoTime();
     rateLimiter.acquire(Collections.singletonMap(tag, credits));
     if (waitTimeMetric != null) {
@@ -159,11 +152,4 @@ public class TableRateLimiter<K, V> {
   public void throttleRecords(Collection<Entry<K, V>> records) {
     throttle(getEntryCredits(records));
   }
-
-  /**
-   * @return whether rate limiting is enabled for the associated table
-   */
-  public boolean isRateLimited() {
-    return rateLimited;
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
index 3235d5a..f3780bf 100644
--- a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
+++ b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
@@ -28,9 +28,7 @@ import org.apache.samza.util.RateLimiter;
 import org.junit.Assert;
 import org.junit.Test;
 
-
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -52,7 +50,7 @@ public class TestTableRateLimiter {
       return credits;
     };
     RateLimiter rateLimiter = mock(RateLimiter.class);
-    doReturn(Collections.singleton(DEFAULT_TAG)).when(rateLimiter).getSupportedTags();
+    doReturn(Collections.singleton(tag)).when(rateLimiter).getSupportedTags();
     TableRateLimiter<String, String> rateLimitHelper = new TableRateLimiter<>("foo", rateLimiter, credFn, tag);
     Timer timer = mock(Timer.class);
     rateLimitHelper.setTimerMetric(timer);
@@ -98,6 +96,7 @@ public class TestTableRateLimiter {
   public void testThrottleUnknownTag() {
     TableRateLimiter<String, String> rateLimitHelper = getThrottler("unknown_tag");
     rateLimitHelper.throttle("foo");
-    verify(rateLimitHelper.rateLimiter, times(0)).acquire(anyMap());
+    verify(rateLimitHelper.rateLimiter, times(0)).acquire(anyInt());
+    verify(rateLimitHelper.rateLimiter, times(1)).acquire(anyMap());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
index cef224d..1cb914f 100644
--- a/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
@@ -19,6 +19,7 @@
 package org.apache.samza.table;
 
 import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.table.utils.TableMetrics;
@@ -66,4 +67,12 @@ abstract public class BaseReadWriteTable<K, V> implements ReadWriteTable<K, V> {
   public String getTableId() {
     return tableId;
   }
+
+  public interface Func0 {
+    void apply();
+  }
+
+  public interface Func1<T> {
+    CompletableFuture<T> apply();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/main/java/org/apache/samza/table/ratelimit/AsyncRateLimitedTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/ratelimit/AsyncRateLimitedTable.java b/samza-core/src/main/java/org/apache/samza/table/ratelimit/AsyncRateLimitedTable.java
new file mode 100644
index 0000000..69f3dd3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/ratelimit/AsyncRateLimitedTable.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.ratelimit;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+
+/**
+ * A composable read and/or write rate limited asynchronous table implementation
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class AsyncRateLimitedTable<K, V> implements AsyncReadWriteTable<K, V> {
+
+  private final String tableId;
+  private final AsyncReadWriteTable<K, V> table;
+  private final TableRateLimiter<K, V> readRateLimiter;
+  private final TableRateLimiter<K, V> writeRateLimiter;
+  private final ExecutorService rateLimitingExecutor;
+
+  public AsyncRateLimitedTable(String tableId, AsyncReadWriteTable<K, V> table, TableRateLimiter<K, V> readRateLimiter,
+      TableRateLimiter<K, V> writeRateLimiter, ExecutorService rateLimitingExecutor) {
+    Preconditions.checkNotNull(tableId, "null tableId");
+    Preconditions.checkNotNull(table, "null table");
+    Preconditions.checkNotNull(rateLimitingExecutor, "null rateLimitingExecutor");
+    Preconditions.checkArgument(readRateLimiter != null || writeRateLimiter != null,
+        "both readRateLimiter and writeRateLimiter are null");
+    this.tableId = tableId;
+    this.table = table;
+    this.readRateLimiter = readRateLimiter;
+    this.writeRateLimiter = writeRateLimiter;
+    this.rateLimitingExecutor = rateLimitingExecutor;
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key) {
+    return isReadRateLimited()
+      ? CompletableFuture
+          .runAsync(() -> readRateLimiter.throttle(key), rateLimitingExecutor)
+          .thenCompose((r) -> table.getAsync(key))
+      : table.getAsync(key);
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+    return isReadRateLimited()
+      ? CompletableFuture
+          .runAsync(() -> readRateLimiter.throttle(keys), rateLimitingExecutor)
+          .thenCompose((r) -> table.getAllAsync(keys))
+      : table.getAllAsync(keys);
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V value) {
+    return isWriteRateLimited()
+        ? CompletableFuture
+            .runAsync(() -> writeRateLimiter.throttle(key, value), rateLimitingExecutor)
+            .thenCompose((r) -> table.putAsync(key, value))
+        : table.putAsync(key, value);
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
+    return isWriteRateLimited()
+      ? CompletableFuture
+          .runAsync(() -> writeRateLimiter.throttleRecords(entries), rateLimitingExecutor)
+          .thenCompose((r) -> table.putAllAsync(entries))
+      : table.putAllAsync(entries);
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key) {
+    return isWriteRateLimited()
+      ? CompletableFuture
+          .runAsync(() -> writeRateLimiter.throttle(key), rateLimitingExecutor)
+          .thenCompose((r) -> table.deleteAsync(key))
+      : table.deleteAsync(key);
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+    return isWriteRateLimited()
+      ? CompletableFuture
+          .runAsync(() -> writeRateLimiter.throttle(keys), rateLimitingExecutor)
+          .thenCompose((r) -> table.deleteAllAsync(keys))
+      : table.deleteAllAsync(keys);
+  }
+
+  @Override
+  public void init(Context context) {
+    table.init(context);
+    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+    if (metricsConfig.getMetricsTimerEnabled()) {
+      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
+      if (isReadRateLimited()) {
+        readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
+      }
+      if (isWriteRateLimited()) {
+        writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
+      }
+    }
+  }
+
+  @Override
+  public void flush() {
+    table.flush();
+  }
+
+  @Override
+  public void close() {
+    table.close();
+  }
+
+  private boolean isReadRateLimited() {
+    return readRateLimiter != null;
+  }
+
+  private boolean isWriteRateLimited() {
+    return writeRateLimiter != null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
new file mode 100644
index 0000000..d4dbc03
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.remote;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.context.Context;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+
+
+/**
+ * A composable asynchronous table implementation that delegates read/write operations
+ * to the underlying table read and write functions.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class AsyncRemoteTable<K, V> implements AsyncReadWriteTable<K, V> {
+
+  private final TableReadFunction<K, V> readFn;
+  private final TableWriteFunction<K, V> writeFn;
+
+  public AsyncRemoteTable(TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
+    Preconditions.checkNotNull(readFn, "null readFn");
+    this.readFn = readFn;
+    this.writeFn = writeFn;
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key) {
+    return readFn.getAsync(key);
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+    return readFn.getAllAsync(keys);
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V value) {
+    Preconditions.checkNotNull(writeFn, "null writeFn");
+    return writeFn.putAsync(key, value);
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
+    Preconditions.checkNotNull(writeFn, "null writeFn");
+    return writeFn.putAllAsync(entries);
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key) {
+    Preconditions.checkNotNull(writeFn, "null writeFn");
+    return writeFn.deleteAsync(key);
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+    Preconditions.checkNotNull(writeFn, "null writeFn");
+    return writeFn.deleteAllAsync(keys);
+  }
+
+  @Override
+  public void init(Context context) {
+    readFn.init(context);
+    if (writeFn != null) {
+      writeFn.init(context);
+    }
+  }
+
+  @Override
+  public void flush() {
+    if (writeFn != null) {
+      writeFn.flush();
+    }
+  }
+
+  @Override
+  public void close() {
+    readFn.close();
+    if (writeFn != null) {
+      writeFn.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
index 5b9b289..8301661 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
@@ -19,23 +19,24 @@
 
 package org.apache.samza.table.remote;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import java.util.Collection;
+
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
-import java.util.function.BiFunction;
-import java.util.function.Function;
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
 import org.apache.samza.table.BaseReadWriteTable;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.utils.TableMetricsUtil;
+import org.apache.samza.table.ratelimit.AsyncRateLimitedTable;
+import org.apache.samza.table.retry.AsyncRetriableTable;
+import org.apache.samza.table.retry.TableRetryPolicy;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -76,12 +77,22 @@ import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
 public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
     implements ReadWriteTable<K, V> {
 
-  protected final ExecutorService callbackExecutor;
-  protected final ExecutorService tableExecutor;
+  // Read/write functions
   protected final TableReadFunction<K, V> readFn;
   protected final TableWriteFunction<K, V> writeFn;
+  // Rate limiting
   protected final TableRateLimiter<K, V> readRateLimiter;
-  protected final TableRateLimiter writeRateLimiter;
+  protected final TableRateLimiter<K, V> writeRateLimiter;
+  protected final ExecutorService rateLimitingExecutor;
+  // Retries
+  protected final TableRetryPolicy readRetryPolicy;
+  protected final TableRetryPolicy writeRetryPolicy;
+  protected final ScheduledExecutorService retryExecutor;
+  // Other
+  protected final ExecutorService callbackExecutor;
+
+  // The async table to delegate to
+  protected final AsyncReadWriteTable<K, V> asyncTable;
 
   /**
    * Construct a RemoteTable instance
@@ -90,33 +101,46 @@ public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
    * @param writeFn {@link TableWriteFunction} for read operations
    * @param readRateLimiter helper for read rate limiting
    * @param writeRateLimiter helper for write rate limiting
-   * @param tableExecutor executor for issuing async requests
+   * @param rateLimitingExecutor executor for executing rate limiting
+   * @param readRetryPolicy read retry policy
+   * @param writeRetryPolicy write retry policy
+   * @param retryExecutor executor for invoking retries
    * @param callbackExecutor executor for invoking async callbacks
    */
-  public RemoteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
-      TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> writeRateLimiter,
-      ExecutorService tableExecutor, ExecutorService callbackExecutor) {
+  public RemoteTable(
+      String tableId,
+      TableReadFunction readFn,
+      TableWriteFunction writeFn,
+      TableRateLimiter<K, V> readRateLimiter,
+      TableRateLimiter<K, V> writeRateLimiter,
+      ExecutorService rateLimitingExecutor,
+      TableRetryPolicy readRetryPolicy,
+      TableRetryPolicy writeRetryPolicy,
+      ScheduledExecutorService retryExecutor,
+      ExecutorService callbackExecutor) {
+
     super(tableId);
-    Preconditions.checkNotNull(readFn, "null read function");
+    Preconditions.checkNotNull(readFn, "null readFn");
+
     this.readFn = readFn;
     this.writeFn = writeFn;
     this.readRateLimiter = readRateLimiter;
     this.writeRateLimiter = writeRateLimiter;
-    this.tableExecutor = tableExecutor;
+    this.rateLimitingExecutor = rateLimitingExecutor;
+    this.readRetryPolicy = readRetryPolicy;
+    this.writeRetryPolicy = writeRetryPolicy;
     this.callbackExecutor = callbackExecutor;
-  }
+    this.retryExecutor = retryExecutor;
 
-  @Override
-  public void init(Context context) {
-    super.init(context);
-    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
-    if (metricsConfig.getMetricsTimerEnabled()) {
-      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
-      readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
-      if (writeRateLimiter != null) {
-        writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
-      }
+    AsyncReadWriteTable table = new AsyncRemoteTable(readFn, writeFn);
+    if (readRateLimiter != null || writeRateLimiter != null) {
+      table = new AsyncRateLimitedTable(tableId, table, readRateLimiter, writeRateLimiter, rateLimitingExecutor);
+    }
+    if (readRetryPolicy != null || writeRetryPolicy != null) {
+      table = new AsyncRetriableTable(tableId, table, readRetryPolicy, writeRetryPolicy, retryExecutor, readFn, writeFn);
     }
+
+    asyncTable = table;
   }
 
   @Override
@@ -130,8 +154,8 @@ public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
 
   @Override
   public CompletableFuture<V> getAsync(K key) {
-    Preconditions.checkNotNull(key);
-    return execute(readRateLimiter, key, readFn::getAsync, metrics.numGets, metrics.getNs)
+    Preconditions.checkNotNull(key, "null key");
+    return instrument(() -> asyncTable.getAsync(key), metrics.numGets, metrics.getNs)
         .handle((result, e) -> {
             if (e != null) {
               throw new SamzaException("Failed to get the records for " + key, e);
@@ -154,11 +178,11 @@ public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
 
   @Override
   public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
-    Preconditions.checkNotNull(keys);
+    Preconditions.checkNotNull(keys, "null keys");
     if (keys.isEmpty()) {
       return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
     }
-    return execute(readRateLimiter, keys, readFn::getAllAsync, metrics.numGetAlls, metrics.getAllNs)
+    return instrument(() -> asyncTable.getAllAsync(keys), metrics.numGetAlls, metrics.getAllNs)
         .handle((result, e) -> {
             if (e != null) {
               throw new SamzaException("Failed to get the records for " + keys, e);
@@ -180,12 +204,12 @@ public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
   @Override
   public CompletableFuture<Void> putAsync(K key, V value) {
     Preconditions.checkNotNull(writeFn, "null write function");
-    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(key, "null key");
     if (value == null) {
       return deleteAsync(key);
     }
 
-    return execute(writeRateLimiter, key, value, writeFn::putAsync, metrics.numPuts, metrics.putNs)
+    return instrument(() -> asyncTable.putAsync(key, value), metrics.numPuts, metrics.putNs)
         .exceptionally(e -> {
             throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e);
           });
@@ -202,25 +226,27 @@ public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
 
   @Override
   public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
+
     Preconditions.checkNotNull(writeFn, "null write function");
-    Preconditions.checkNotNull(records);
+    Preconditions.checkNotNull(records, "null records");
+
     if (records.isEmpty()) {
       return CompletableFuture.completedFuture(null);
     }
 
     List<K> deleteKeys = records.stream()
         .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList());
-
-    CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
-        ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys);
-
     List<Entry<K, V>> putRecords = records.stream()
         .filter(e -> e.getValue() != null).collect(Collectors.toList());
 
+    CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
+        ? CompletableFuture.completedFuture(null)
+        : deleteAllAsync(deleteKeys);
+
     // Return the combined future
     return CompletableFuture.allOf(
         deleteFuture,
-        executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, metrics.numPutAlls, metrics.putAllNs))
+        instrument(() -> asyncTable.putAllAsync(putRecords), metrics.numPutAlls, metrics.putAllNs))
         .exceptionally(e -> {
             String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(","));
             throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
@@ -239,8 +265,8 @@ public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
   @Override
   public CompletableFuture<Void> deleteAsync(K key) {
     Preconditions.checkNotNull(writeFn, "null write function");
-    Preconditions.checkNotNull(key);
-    return execute(writeRateLimiter, key, writeFn::deleteAsync, metrics.numDeletes, metrics.deleteNs)
+    Preconditions.checkNotNull(key, "null key");
+    return instrument(() -> asyncTable.deleteAsync(key), metrics.numDeletes, metrics.deleteNs)
         .exceptionally(e -> {
             throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
           });
@@ -258,94 +284,46 @@ public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
   @Override
   public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
     Preconditions.checkNotNull(writeFn, "null write function");
-    Preconditions.checkNotNull(keys);
+    Preconditions.checkNotNull(keys, "null keys");
     if (keys.isEmpty()) {
       return CompletableFuture.completedFuture(null);
     }
 
-    return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, metrics.numDeleteAlls, metrics.deleteAllNs)
+    return instrument(() -> asyncTable.deleteAllAsync(keys), metrics.numDeleteAlls, metrics.deleteAllNs)
         .exceptionally(e -> {
             throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e);
           });
   }
 
   @Override
-  public void flush() {
-    if (writeFn != null) {
-      try {
-        incCounter(metrics.numFlushes);
-        long startNs = clock.nanoTime();
-        writeFn.flush();
-        updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
-      } catch (Exception e) {
-        String errMsg = "Failed to flush remote store";
-        logger.error(errMsg, e);
-        throw new SamzaException(errMsg, e);
-      }
-    }
+  public void init(Context context) {
+    super.init(context);
+    asyncTable.init(context);
   }
 
   @Override
-  public void close() {
-    readFn.close();
-    if (writeFn != null) {
-      writeFn.close();
+  public void flush() {
+    try {
+      incCounter(metrics.numFlushes);
+      long startNs = clock.nanoTime();
+      asyncTable.flush();
+      updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
+    } catch (Exception e) {
+      String errMsg = "Failed to flush remote store";
+      logger.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
     }
   }
 
-  /**
-   * Execute an async request given a table key
-   * @param rateLimiter helper for rate limiting
-   * @param key key of the table record
-   * @param method method to be executed
-   * @param counter count metric to be updated
-   * @param timer latency metric to be updated
-   * @param <T> return type
-   * @return CompletableFuture of the operation
-   */
-  protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
-      K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-        .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
-        .thenCompose((r) -> method.apply(key))
-        : method.apply(key);
-    return completeExecution(ioFuture, startNs, timer);
+  @Override
+  public void close() {
+    asyncTable.close();
   }
 
-  /**
-   * Execute an async request given a collection of table keys
-   * @param rateLimiter helper for rate limiting
-   * @param keys collection of keys
-   * @param method method to be executed
-   * @param counter count metric to be updated
-   * @param timer latency metric to be updated
-   * @param <T> return type
-   * @return CompletableFuture of the operation
-   */
-  protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
-      Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) {
+  protected <T> CompletableFuture<T> instrument(Func1<T> func, Counter counter, Timer timer) {
     incCounter(counter);
     final long startNs = clock.nanoTime();
-    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-        .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
-        .thenCompose((r) -> method.apply(keys))
-        : method.apply(keys);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  /**
-   * Complete the pending execution and update timer
-   * @param ioFuture the future to be executed
-   * @param startNs start time in nanosecond
-   * @param timer latency metric to be updated
-   * @param <T> return type
-   * @return CompletableFuture of the operation
-   */
-  protected  <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) {
+    CompletableFuture<T> ioFuture = func.apply();
     if (callbackExecutor != null) {
       ioFuture.thenApplyAsync(r -> {
           updateTimer(timer, clock.nanoTime() - startNs);
@@ -359,78 +337,4 @@ public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
     }
     return ioFuture;
   }
-
-  /**
-   * Execute an async request given a table record (key+value)
-   * @param rateLimiter helper for rate limiting
-   * @param key key of the table record
-   * @param value value of the table record
-   * @param method method to be executed
-   * @param counter count metric to be updated
-   * @param timer latency metric to be updated
-   * @return CompletableFuture of the operation
-   */
-  protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
-      K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
-            .thenCompose((r) -> method.apply(key, value))
-        : method.apply(key, value);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  /**
-   * Execute an async request given a collection of table records
-   * @param rateLimiter helper for rate limiting
-   * @param records list of records
-   * @param method method to be executed
-   * @param counter count metric to be updated
-   * @param timer latency metric to be updated
-   * @return CompletableFuture of the operation
-   */
-  protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
-      Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method,
-      Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
-            .thenCompose((r) -> method.apply(records))
-        : method.apply(records);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  @VisibleForTesting
-  public ExecutorService getCallbackExecutor() {
-    return callbackExecutor;
-  }
-
-  @VisibleForTesting
-  public ExecutorService getTableExecutor() {
-    return tableExecutor;
-  }
-
-  @VisibleForTesting
-  public TableReadFunction<K, V> getReadFn() {
-    return readFn;
-  }
-
-  @VisibleForTesting
-  public TableRateLimiter<K, V> getReadRateLimiter() {
-    return readRateLimiter;
-  }
-
-  @VisibleForTesting
-  public TableWriteFunction<K, V> getWriteFn() {
-    return writeFn;
-  }
-
-  @VisibleForTesting
-  public TableRateLimiter getWriteRateLimiter() {
-    return writeRateLimiter;
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 8b6bc1a..1716244 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -23,12 +23,9 @@ import com.google.common.base.Preconditions;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
-import org.apache.samza.table.retry.RetriableReadFunction;
-import org.apache.samza.table.retry.RetriableWriteFunction;
 import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.table.BaseTableProvider;
 import org.apache.samza.table.utils.SerdeUtils;
-import org.apache.samza.table.utils.TableMetricsUtil;
 import org.apache.samza.util.RateLimiter;
 
 import java.util.ArrayList;
@@ -51,7 +48,7 @@ public class RemoteTableProvider extends BaseTableProvider {
    * are shared by both read/write operations such that tables of the same tableId all share
    * the set same of executors globally whereas table itself is per-task.
    */
-  private static Map<String, ExecutorService> tableExecutors = new ConcurrentHashMap<>();
+  private static Map<String, ExecutorService> rateLimitingExecutors = new ConcurrentHashMap<>();
   private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
   private static ScheduledExecutorService retryExecutor;
 
@@ -72,18 +69,12 @@ public class RemoteTableProvider extends BaseTableProvider {
     if (rateLimiter != null) {
       rateLimiter.init(this.context);
     }
-    TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
-    TableRateLimiter readRateLimiter = new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
 
+    TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
+    TableRateLimiter readRateLimiter = rateLimiter != null && rateLimiter.getSupportedTags().contains(RemoteTableDescriptor.RL_READ_TAG)
+        ? new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG)
+        : null;
     TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY);
-    if (readRetryPolicy != null) {
-      if (retryExecutor == null) {
-        retryExecutor = createRetryExecutor();
-      }
-      readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
-    }
-
-    boolean isRateLimited = readRateLimiter.isRateLimited();
 
     // Write part
     TableWriteFunction writeFn = getWriteFn(tableConfig);
@@ -91,15 +82,14 @@ public class RemoteTableProvider extends BaseTableProvider {
     TableRetryPolicy writeRetryPolicy = null;
     if (writeFn != null) {
       TableRateLimiter.CreditFunction<?, ?> writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
-      writeRateLimiter = new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
-      isRateLimited |= writeRateLimiter.isRateLimited();
+      writeRateLimiter = rateLimiter != null && rateLimiter.getSupportedTags().contains(RemoteTableDescriptor.RL_WRITE_TAG)
+          ? new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG)
+          : null;
       writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY);
-      if (writeRetryPolicy != null) {
-        if (retryExecutor == null) {
-          retryExecutor = createRetryExecutor();
-        }
-        writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
-      }
+    }
+
+    if (readRetryPolicy != null || writeRetryPolicy != null) {
+      retryExecutor = createRetryExecutor();
     }
 
     // Optional executor for future callback/completion. Shared by both read and write operations.
@@ -114,8 +104,9 @@ public class RemoteTableProvider extends BaseTableProvider {
             }));
     }
 
+    boolean isRateLimited = readRateLimiter != null || writeRateLimiter != null;
     if (isRateLimited) {
-      tableExecutors.computeIfAbsent(tableId, (arg) ->
+      rateLimitingExecutors.computeIfAbsent(tableId, (arg) ->
           Executors.newSingleThreadExecutor(runnable -> {
               Thread thread = new Thread(runnable);
               thread.setName("table-" + tableId + "-async-executor");
@@ -124,17 +115,11 @@ public class RemoteTableProvider extends BaseTableProvider {
             }));
     }
 
-    RemoteTable table = new RemoteTable(tableId, readFn, writeFn, readRateLimiter,
-        writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
-
-    TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
-    if (readRetryPolicy != null) {
-      ((RetriableReadFunction) readFn).setMetrics(metricsUtil);
-    }
-    if (writeRetryPolicy != null) {
-      ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
-    }
-
+    RemoteTable table = new RemoteTable(tableId,
+        readFn, writeFn,
+        readRateLimiter, writeRateLimiter, rateLimitingExecutors.get(tableId),
+        readRetryPolicy, writeRetryPolicy, retryExecutor,
+        callbackExecutors.get(tableId));
     table.init(this.context);
     tables.add(table);
     return table;
@@ -144,7 +129,7 @@ public class RemoteTableProvider extends BaseTableProvider {
   public void close() {
     super.close();
     tables.forEach(t -> t.close());
-    tableExecutors.values().forEach(e -> e.shutdown());
+    rateLimitingExecutors.values().forEach(e -> e.shutdown());
     callbackExecutors.values().forEach(e -> e.shutdown());
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/main/java/org/apache/samza/table/retry/AsyncRetriableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/AsyncRetriableTable.java b/samza-core/src/main/java/org/apache/samza/table/retry/AsyncRetriableTable.java
new file mode 100644
index 0000000..ba39517
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/AsyncRetriableTable.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.retry;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import java.util.function.Predicate;
+import net.jodah.failsafe.RetryPolicy;
+
+import org.apache.samza.context.Context;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
+
+import static org.apache.samza.table.BaseReadWriteTable.Func1;
+
+
+/**
+ * A composable asynchronous retriable table implementation that supports features
+ * defined in {@link TableRetryPolicy}.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class AsyncRetriableTable<K, V> implements AsyncReadWriteTable<K, V> {
+
+  private final String tableId;
+  private final AsyncReadWriteTable<K, V> table;
+  private final RetryPolicy readRetryPolicy;
+  private final RetryPolicy writeRetryPolicy;
+  private final ScheduledExecutorService retryExecutor;
+
+  @VisibleForTesting
+  RetryMetrics readRetryMetrics;
+  @VisibleForTesting
+  RetryMetrics writeRetryMetrics;
+
+  public AsyncRetriableTable(String tableId, AsyncReadWriteTable<K, V> table,
+      TableRetryPolicy readRetryPolicy, TableRetryPolicy writeRetryPolicy, ScheduledExecutorService retryExecutor,
+      TableReadFunction readFn, TableWriteFunction writeFn) {
+
+    Preconditions.checkNotNull(tableId, "null tableId");
+    Preconditions.checkNotNull(table, "null table");
+    Preconditions.checkNotNull(retryExecutor, "null retryExecutor");
+    Preconditions.checkArgument(readRetryPolicy != null || writeRetryPolicy != null,
+        "both readRetryPolicy and writeRetryPolicy are null");
+
+    this.tableId = tableId;
+    this.table = table;
+    this.retryExecutor = retryExecutor;
+
+    if (readRetryPolicy != null && readFn != null) {
+      Predicate<Throwable> readRetryPredicate = readRetryPolicy.getRetryPredicate();
+      readRetryPolicy.withRetryPredicate((ex) -> readFn.isRetriable(ex) || readRetryPredicate.test(ex));
+      this.readRetryPolicy = FailsafeAdapter.valueOf(readRetryPolicy);
+    } else {
+      this.readRetryPolicy = null;
+    }
+
+    if (writeRetryPolicy != null && writeFn != null) {
+      Predicate<Throwable> writeRetryPredicate = writeRetryPolicy.getRetryPredicate();
+      writeRetryPolicy.withRetryPredicate((ex) -> writeFn.isRetriable(ex) || writeRetryPredicate.test(ex));
+      this.writeRetryPolicy = FailsafeAdapter.valueOf(writeRetryPolicy);
+    } else {
+      this.writeRetryPolicy = null;
+    }
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key) {
+    return doRead(() -> table.getAsync(key));
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+    return doRead(() -> table.getAllAsync(keys));
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V value) {
+    return doWrite(() -> table.putAsync(key, value));
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
+    return doWrite(() -> table.putAllAsync(entries));
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key) {
+    return doWrite(() -> table.deleteAsync(key));
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+    return doWrite(() -> table.deleteAllAsync(keys));
+  }
+
+  @Override
+  public void init(Context context) {
+    table.init(context);
+    TableMetricsUtil metricsUtil = new TableMetricsUtil(context, this, tableId);
+    if (readRetryPolicy != null) {
+      readRetryMetrics = new RetryMetrics("reader", metricsUtil);
+    }
+    if (writeRetryPolicy != null) {
+      writeRetryMetrics = new RetryMetrics("writer", metricsUtil);
+    }
+  }
+
+  @Override
+  public void flush() {
+    table.flush();
+  }
+
+  @Override
+  public void close() {
+    table.close();
+  }
+
+  private <T> CompletableFuture<T> doRead(Func1<T> func) {
+    return readRetryPolicy != null
+        ? failsafe(readRetryPolicy, readRetryMetrics, retryExecutor).future(() -> func.apply())
+        : func.apply();
+  }
+
+  private <T> CompletableFuture<T> doWrite(Func1<T> func) {
+    return writeRetryPolicy != null
+        ? failsafe(writeRetryPolicy, writeRetryMetrics, retryExecutor).future(() -> func.apply())
+        : func.apply();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/main/java/org/apache/samza/testUtils/TestUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/testUtils/TestUtils.java b/samza-core/src/main/java/org/apache/samza/testUtils/TestUtils.java
new file mode 100644
index 0000000..9b17557
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/testUtils/TestUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Field;
+import org.apache.samza.SamzaException;
+
+
+/**
+ * A collection of utility methods used for testing
+ */
+public class TestUtils {
+  /**
+   * Get the value of the specified field from the target object
+   * @param target target object
+   * @param fieldName name of the field
+   * @param <T> type of retrieved value
+   * @return value of the field
+   */
+  public static <T> T getFieldValue(Object target, String fieldName) {
+
+    Preconditions.checkNotNull(target, "Target object is null");
+    Preconditions.checkNotNull(fieldName, "Field name is null");
+
+    Field field = null;
+    Boolean prevAccessible = null;
+    try {
+      field = target.getClass().getDeclaredField(fieldName);
+      prevAccessible = field.isAccessible();
+      field.setAccessible(true);
+      return (T) field.get(target);
+    } catch (Exception ex) {
+      throw new SamzaException(ex);
+    } finally {
+      if (field != null && prevAccessible != null) {
+        field.setAccessible(prevAccessible);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index c304bfd..b5e35cf 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -283,8 +283,10 @@ public class TestCachingTable {
     TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
     final RemoteTable<String, String> remoteTable = new RemoteTable<>(
-        tableId + "-remote", readFn, writeFn, rateLimitHelper, rateLimitHelper,
-        Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
+        tableId + "-remote", readFn, writeFn,
+        rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(),
+        null, null, null,
+        Executors.newSingleThreadExecutor());
 
     final CachingTable<String, String> cachingTable = new CachingTable<>(
         tableId, remoteTable, guavaTable, false);
@@ -402,8 +404,10 @@ public class TestCachingTable {
     doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
 
     final RemoteTable<String, String> remoteTable = new RemoteTable<>(
-        tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper,
-        Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
+        tableId, readFn, writeFn,
+        rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(),
+        null, null, null,
+        Executors.newSingleThreadExecutor());
 
     final CachingTable<String, String> cachingTable = new CachingTable<>(
         tableId, remoteTable, guavaTable, false);

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/test/java/org/apache/samza/table/ratelimit/TestAsyncRateLimitedTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/ratelimit/TestAsyncRateLimitedTable.java b/samza-core/src/test/java/org/apache/samza/table/ratelimit/TestAsyncRateLimitedTable.java
new file mode 100644
index 0000000..7c646fb
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/ratelimit/TestAsyncRateLimitedTable.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.ratelimit;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+import org.apache.samza.table.remote.AsyncRemoteTable;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.remote.TestRemoteTable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestAsyncRateLimitedTable {
+
+  private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
+
+  @Test(expected = NullPointerException.class)
+  public void testNotNullTableId() {
+    new AsyncRateLimitedTable(null, mock(AsyncReadWriteTable.class),
+        mock(TableRateLimiter.class), mock(TableRateLimiter.class),
+        mock(ScheduledExecutorService.class));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNotNullTable() {
+    new AsyncRateLimitedTable("t1", null,
+        mock(TableRateLimiter.class), mock(TableRateLimiter.class),
+        mock(ScheduledExecutorService.class));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNotNullRateLimitingExecutor() {
+    new AsyncRateLimitedTable("t1", mock(AsyncReadWriteTable.class),
+        mock(TableRateLimiter.class), mock(TableRateLimiter.class),
+        null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNotNullAtLeastOneRateLimiter() {
+    new AsyncRateLimitedTable("t1", mock(AsyncReadWriteTable.class),
+        null, null,
+        mock(ScheduledExecutorService.class));
+  }
+
+  @Test
+  public void testGetThrottling() throws Exception {
+    TableRateLimiter readRateLimiter = mock(TableRateLimiter.class);
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(any());
+    Map<String, String> result = new HashMap<>();
+    result.put("foo", "bar");
+    doReturn(CompletableFuture.completedFuture(result)).when(readFn).getAllAsync(any());
+    AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, null);
+    AsyncRateLimitedTable table = new AsyncRateLimitedTable("t1", delegate,
+        readRateLimiter, null, schedExec);
+    table.init(TestRemoteTable.getMockContext());
+
+    Assert.assertEquals("bar", table.getAsync("foo").get());
+    verify(readFn, times(1)).getAsync(any());
+    verify(readRateLimiter, times(1)).throttle(anyString());
+    verify(readRateLimiter, times(0)).throttle(anyList());
+
+    Assert.assertEquals(result, table.getAllAsync(Arrays.asList("")).get());
+    verify(readFn, times(1)).getAllAsync(any());
+    verify(readRateLimiter, times(1)).throttle(anyList());
+    verify(readRateLimiter, times(1)).throttle(anyString());
+  }
+
+  @Test
+  public void testPutThrottling() throws Exception {
+    TableRateLimiter readRateLimiter = mock(TableRateLimiter.class);
+    TableRateLimiter writeRateLimiter = mock(TableRateLimiter.class);
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
+    doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any());
+    doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
+    doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAllAsync(any());
+    AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, writeFn);
+    AsyncRateLimitedTable table = new AsyncRateLimitedTable("t1", delegate,
+        readRateLimiter, writeRateLimiter, schedExec);
+    table.init(TestRemoteTable.getMockContext());
+
+    table.putAsync("foo", "bar").get();
+    verify(writeFn, times(1)).putAsync(any(), any());
+    verify(writeRateLimiter, times(1)).throttle(anyString(), anyString());
+    verify(writeRateLimiter, times(0)).throttleRecords(anyList());
+    verify(writeRateLimiter, times(0)).throttle(anyString());
+    verify(writeRateLimiter, times(0)).throttle(anyList());
+
+    table.putAllAsync(Arrays.asList(new Entry("1", "2"))).get();
+    verify(writeFn, times(1)).putAllAsync(any());
+    verify(writeRateLimiter, times(1)).throttle(anyString(), anyString());
+    verify(writeRateLimiter, times(1)).throttleRecords(anyList());
+    verify(writeRateLimiter, times(0)).throttle(anyString());
+    verify(writeRateLimiter, times(0)).throttle(anyList());
+
+    table.deleteAsync("foo").get();
+    verify(writeFn, times(1)).deleteAsync(anyString());
+    verify(writeRateLimiter, times(1)).throttle(anyString(), anyString());
+    verify(writeRateLimiter, times(1)).throttleRecords(anyList());
+    verify(writeRateLimiter, times(1)).throttle(anyString());
+    verify(writeRateLimiter, times(0)).throttle(anyList());
+
+    table.deleteAllAsync(Arrays.asList("1", "2")).get();
+    verify(writeFn, times(1)).deleteAllAsync(any());
+    verify(writeRateLimiter, times(1)).throttle(anyString(), anyString());
+    verify(writeRateLimiter, times(1)).throttleRecords(anyList());
+    verify(writeRateLimiter, times(1)).throttle(anyString());
+    verify(writeRateLimiter, times(1)).throttle(anyList());
+  }
+
+  @Test
+  public void testFlushAndClose() {
+    TableRateLimiter readRateLimiter = mock(TableRateLimiter.class);
+    TableRateLimiter writeRateLimiter = mock(TableRateLimiter.class);
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, writeFn);
+    AsyncRateLimitedTable table = new AsyncRateLimitedTable("t1", delegate,
+        readRateLimiter, writeRateLimiter, schedExec);
+    table.init(TestRemoteTable.getMockContext());
+
+    table.flush();
+    verify(writeFn, times(1)).flush();
+
+    table.close();
+    verify(readFn, times(1)).close();
+    verify(writeFn, times(1)).close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
new file mode 100644
index 0000000..2a447e0
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.remote;
+
+import java.util.Arrays;
+
+import org.apache.samza.context.Context;
+import org.apache.samza.storage.kv.Entry;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.samza.table.BaseReadWriteTable.Func0;
+
+import static org.junit.Assert.assertTrue;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestAsyncRemoteTable {
+
+  private TableReadFunction<Integer, Integer> readFn;
+  private TableWriteFunction<Integer, Integer> writeFn;
+  private AsyncRemoteTable<Integer, Integer> roTable;
+  private AsyncRemoteTable<Integer, Integer> rwTable;
+
+  @Before
+  public void prepare() {
+    readFn = mock(TableReadFunction.class);
+    writeFn = mock(TableWriteFunction.class);
+    roTable = new AsyncRemoteTable<>(readFn, null);
+    rwTable = new AsyncRemoteTable<>(readFn, writeFn);
+  }
+
+  @Test
+  public void testGetAsync() {
+    int times = 0;
+    roTable.getAsync(1);
+    verify(readFn, times(++times)).getAsync(any());
+    rwTable.getAsync(1);
+    verify(readFn, times(++times)).getAsync(any());
+  }
+
+  @Test
+  public void testGetAllAsync() {
+    int times = 0;
+    roTable.getAllAsync(Arrays.asList(1, 2));
+    verify(readFn, times(++times)).getAllAsync(any());
+    rwTable.getAllAsync(Arrays.asList(1, 2));
+    verify(readFn, times(++times)).getAllAsync(any());
+  }
+
+  @Test
+  public void testPutAsync() {
+    verifyFailure(() -> roTable.putAsync(1, 2));
+    rwTable.putAsync(1, 2);
+    verify(writeFn, times(1)).putAsync(any(), any());
+  }
+
+  @Test
+  public void testPutAllAsync() {
+    verifyFailure(() -> roTable.putAllAsync(Arrays.asList(new Entry(1, 2))));
+    rwTable.putAllAsync(Arrays.asList(new Entry(1, 2)));
+    verify(writeFn, times(1)).putAllAsync(any());
+  }
+
+  @Test
+  public void testDeleteAsync() {
+    verifyFailure(() -> roTable.deleteAsync(1));
+    rwTable.deleteAsync(1);
+    verify(writeFn, times(1)).deleteAsync(any());
+  }
+  @Test
+  public void testDeleteAllAsync() {
+    verifyFailure(() -> roTable.deleteAllAsync(Arrays.asList(1)));
+    rwTable.deleteAllAsync(Arrays.asList(1, 2));
+    verify(writeFn, times(1)).deleteAllAsync(any());
+  }
+
+  @Test
+  public void testInit() {
+    roTable.init(mock(Context.class));
+    verify(readFn, times(1)).init(any());
+    verify(writeFn, times(0)).init(any());
+    rwTable.init(mock(Context.class));
+    verify(readFn, times(2)).init(any());
+    verify(writeFn, times(1)).init(any());
+  }
+
+  @Test
+  public void testClose() {
+    roTable.close();
+    verify(readFn, times(1)).close();
+    verify(writeFn, times(0)).close();
+    rwTable.close();
+    verify(readFn, times(2)).close();
+    verify(writeFn, times(1)).close();
+  }
+
+  @Test
+  public void testFlush() {
+    roTable.flush();
+    verify(writeFn, times(0)).flush();
+    rwTable.flush();
+    verify(writeFn, times(1)).flush();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testFailOnNullReadFn() {
+    new AsyncRemoteTable(null, null);
+  }
+
+  private void verifyFailure(Func0 func) {
+    boolean caughtException = false;
+    try {
+      func.apply();
+    } catch (NullPointerException ex) {
+      caughtException = true;
+    }
+    assertTrue(caughtException);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
index ae96d86..fec28cd 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -26,8 +26,6 @@ import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.retry.RetriableReadFunction;
-import org.apache.samza.table.retry.RetriableWriteFunction;
 import org.apache.samza.table.retry.TableRetryPolicy;
 
 import org.junit.Assert;
@@ -56,6 +54,7 @@ import static org.mockito.Mockito.verify;
 
 
 public class TestRemoteTable {
+
   private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
 
   public static Context getMockContext() {
@@ -68,28 +67,27 @@ public class TestRemoteTable {
     return context;
   }
 
-  private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
-      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
-    return getTable(tableId, readFn, writeFn, null);
+  private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId, TableReadFunction<K, V> readFn,
+      TableWriteFunction<K, V> writeFn, boolean retry) {
+    return getTable(tableId, readFn, writeFn, null, retry);
   }
 
-  private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
-      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
-    RemoteTable<K, V> table;
+  private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId, TableReadFunction<K, V> readFn,
+      TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor, boolean retry) {
 
     TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
     TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
-    doReturn(true).when(readRateLimiter).isRateLimited();
-    doReturn(true).when(writeRateLimiter).isRateLimited();
-
-    ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
-
-    table = new RemoteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
 
-    Context context = getMockContext();
+    TableRetryPolicy readPolicy = retry ? new TableRetryPolicy() : null;
+    TableRetryPolicy writePolicy = retry ? new TableRetryPolicy() : null;
 
-    table.init(context);
+    ExecutorService rateLimitingExecutor = Executors.newSingleThreadExecutor();
+    ScheduledExecutorService retryExecutor = Executors.newSingleThreadScheduledExecutor();
 
+    RemoteTable<K, V> table = new RemoteTable(tableId, readFn, writeFn,
+        readRateLimiter, writeRateLimiter, rateLimitingExecutor,
+        readPolicy, writePolicy, retryExecutor, cbExecutor);
+    table.init(getMockContext());
     return (T) table;
   }
 
@@ -114,12 +112,10 @@ public class TestRemoteTable {
     }
     if (retry) {
       doReturn(true).when(readFn).isRetriable(any());
-      TableRetryPolicy policy = new TableRetryPolicy();
-      readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
     }
-    RemoteTable<String, String> table = getTable(tableId, readFn, null);
+    RemoteTable<String, String> table = getTable(tableId, readFn, null, retry);
     Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
-    verify(table.readRateLimiter, times(1)).throttle(anyString());
+    verify(table.readRateLimiter, times(error && retry ? 2 : 1)).throttle(anyString());
   }
 
   @Test
@@ -151,8 +147,8 @@ public class TestRemoteTable {
     doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
     doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
 
-    RemoteTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
-    RemoteTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
+    RemoteTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null, false);
+    RemoteTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null, false);
 
     CompletableFuture<String> future1 = table1.getAsync("foo1");
     CompletableFuture<String> future2 = table2.getAsync("foo2");
@@ -191,9 +187,8 @@ public class TestRemoteTable {
       } else {
         doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
       }
-      writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
     }
-    RemoteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
+    RemoteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn, retry);
     if (sync) {
       table.put("foo", isDelete ? null : "bar");
     } else {
@@ -209,9 +204,9 @@ public class TestRemoteTable {
     }
     Assert.assertEquals("foo", keyCaptor.getValue());
     if (isDelete) {
-      verify(table.writeRateLimiter, times(1)).throttle(anyString());
+      verify(table.writeRateLimiter, times(error && retry ? 2 : 1)).throttle(anyString());
     } else {
-      verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
+      verify(table.writeRateLimiter, times(error && retry ? 2 : 1)).throttle(anyString(), anyString());
     }
   }
 
@@ -248,7 +243,7 @@ public class TestRemoteTable {
   private void doTestDelete(boolean sync, boolean error) throws Exception {
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
     RemoteTable<String, String> table = getTable("testDelete-" + sync + error,
-        mock(TableReadFunction.class), writeFn);
+        mock(TableReadFunction.class), writeFn, false);
     CompletableFuture<Void> future;
     if (error) {
       future = new CompletableFuture();
@@ -300,7 +295,7 @@ public class TestRemoteTable {
     }
     // Sync is backed by async so needs to mock the async method
     doReturn(future).when(readFn).getAllAsync(any());
-    RemoteTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
+    RemoteTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null, false);
     Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
         : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
     verify(table.readRateLimiter, times(1)).throttle(anyCollection());
@@ -330,7 +325,7 @@ public class TestRemoteTable {
   public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
     RemoteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
-        mock(TableReadFunction.class), writeFn);
+        mock(TableReadFunction.class), writeFn, false);
     CompletableFuture<Void> future;
     if (error) {
       future = new CompletableFuture();
@@ -393,7 +388,7 @@ public class TestRemoteTable {
   public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
     RemoteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
-        mock(TableReadFunction.class), writeFn);
+        mock(TableReadFunction.class), writeFn, false);
     CompletableFuture<Void> future;
     if (error) {
       future = new CompletableFuture();
@@ -433,7 +428,7 @@ public class TestRemoteTable {
   @Test
   public void testFlush() {
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
+    RemoteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn, false);
     table.flush();
     verify(writeFn, times(1)).flush();
   }
@@ -444,7 +439,7 @@ public class TestRemoteTable {
     // Sync is backed by async so needs to mock the async method
     doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
     RemoteTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
-        Executors.newSingleThreadExecutor());
+        Executors.newSingleThreadExecutor(), false);
     Thread testThread = Thread.currentThread();
 
     table.getAsync("foo").thenAccept(result -> {

http://git-wip-us.apache.org/repos/asf/samza/blob/4289ca91/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index 907242f..4703752 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -33,17 +33,20 @@ import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.AsyncReadWriteTable;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.table.ratelimit.AsyncRateLimitedTable;
+import org.apache.samza.table.remote.AsyncRemoteTable;
 import org.apache.samza.table.remote.RemoteTable;
 import org.apache.samza.table.remote.RemoteTableProvider;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.table.retry.RetriableReadFunction;
-import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.AsyncRetriableTable;
 import org.apache.samza.table.retry.TableRetryPolicy;
+import org.apache.samza.testUtils.TestUtils;
 import org.apache.samza.util.EmbeddedTaggedRateLimiter;
 import org.apache.samza.util.RateLimiter;
 
@@ -209,16 +212,26 @@ public class TestRemoteTableDescriptor {
     Table table = provider.getTable();
     Assert.assertTrue(table instanceof RemoteTable);
     RemoteTable rwTable = (RemoteTable) table;
+
+    AsyncReadWriteTable delegate = TestUtils.getFieldValue(rwTable, "asyncTable");
+    Assert.assertTrue(delegate instanceof AsyncRetriableTable);
+    if (rlGets || rlPuts) {
+      delegate = TestUtils.getFieldValue(delegate, "table");
+      Assert.assertTrue(delegate instanceof AsyncRateLimitedTable);
+    }
+    delegate = TestUtils.getFieldValue(delegate, "table");
+    Assert.assertTrue(delegate instanceof AsyncRemoteTable);
+
     if (numRateLimitOps > 0) {
-      Assert.assertTrue(!rlGets || rwTable.getReadRateLimiter() != null);
-      Assert.assertTrue(!rlPuts || rwTable.getWriteRateLimiter() != null);
+      TableRateLimiter readRateLimiter = TestUtils.getFieldValue(rwTable, "readRateLimiter");
+      TableRateLimiter writeRateLimiter = TestUtils.getFieldValue(rwTable, "writeRateLimiter");
+      Assert.assertTrue(!rlGets || readRateLimiter != null);
+      Assert.assertTrue(!rlPuts || writeRateLimiter != null);
     }
 
-    ThreadPoolExecutor callbackExecutor = (ThreadPoolExecutor) rwTable.getCallbackExecutor();
+    ThreadPoolExecutor callbackExecutor = TestUtils.getFieldValue(rwTable, "callbackExecutor");
     Assert.assertEquals(10, callbackExecutor.getCorePoolSize());
 
-    Assert.assertTrue(rwTable.getReadFn() instanceof RetriableReadFunction);
-    Assert.assertFalse(rwTable.getWriteFn() instanceof RetriableWriteFunction);
   }
 
   @Test