You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/12/17 23:11:36 UTC

[3/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and ReadWriteTable

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 93a0521..8b6bc1a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -21,8 +21,7 @@ package org.apache.samza.table.remote;
 
 import com.google.common.base.Preconditions;
 import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.retry.RetriableReadFunction;
 import org.apache.samza.table.retry.RetriableWriteFunction;
@@ -45,9 +44,7 @@ import java.util.concurrent.ScheduledExecutorService;
  */
 public class RemoteTableProvider extends BaseTableProvider {
 
-  private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
-
-  private boolean readOnly;
+  private final List<RemoteTable<?, ?>> tables = new ArrayList<>();
 
   /**
    * Map of tableId -> executor service for async table IO and callbacks. The same executors
@@ -63,21 +60,13 @@ public class RemoteTableProvider extends BaseTableProvider {
   }
 
   @Override
-  public void init(Context context) {
-    super.init(context);
-    JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
-    this.readOnly = tableConfig.getForTable(tableId, RemoteTableDescriptor.WRITE_FN) == null;
-  }
-
-  @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
 
     Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
 
-    RemoteReadableTable table;
-
     JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
 
+    // Read part
     TableReadFunction readFn = getReadFn(tableConfig);
     RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER);
     if (rateLimiter != null) {
@@ -86,34 +75,29 @@ public class RemoteTableProvider extends BaseTableProvider {
     TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
     TableRateLimiter readRateLimiter = new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
 
-    TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
-    TableRateLimiter writeRateLimiter = null;
-
     TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY);
-    TableRetryPolicy writeRetryPolicy = null;
-
-    if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
-      retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
-          Thread thread = new Thread(runnable);
-          thread.setName("table-retry-executor");
-          thread.setDaemon(true);
-          return thread;
-        });
-    }
-
     if (readRetryPolicy != null) {
+      if (retryExecutor == null) {
+        retryExecutor = createRetryExecutor();
+      }
       readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
     }
 
-    TableWriteFunction writeFn = getWriteFn(tableConfig);
-
     boolean isRateLimited = readRateLimiter.isRateLimited();
-    if (!readOnly) {
-      writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
+
+    // Write part
+    TableWriteFunction writeFn = getWriteFn(tableConfig);
+    TableRateLimiter writeRateLimiter = null;
+    TableRetryPolicy writeRetryPolicy = null;
+    if (writeFn != null) {
+      TableRateLimiter.CreditFunction<?, ?> writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
       writeRateLimiter = new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
       isRateLimited |= writeRateLimiter.isRateLimited();
       writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY);
       if (writeRetryPolicy != null) {
+        if (retryExecutor == null) {
+          retryExecutor = createRetryExecutor();
+        }
         writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
       }
     }
@@ -140,13 +124,8 @@ public class RemoteTableProvider extends BaseTableProvider {
             }));
     }
 
-    if (readOnly) {
-      table = new RemoteReadableTable(tableId, readFn, readRateLimiter,
-          tableExecutors.get(tableId), callbackExecutors.get(tableId));
-    } else {
-      table = new RemoteReadWriteTable(tableId, readFn, writeFn, readRateLimiter,
-          writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
-    }
+    RemoteTable table = new RemoteTable(tableId, readFn, writeFn, readRateLimiter,
+        writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
 
     TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
     if (readRetryPolicy != null) {
@@ -192,5 +171,14 @@ public class RemoteTableProvider extends BaseTableProvider {
     }
     return writeFn;
   }
+
+  private ScheduledExecutorService createRetryExecutor() {
+    return Executors.newSingleThreadScheduledExecutor(runnable -> {
+        Thread thread = new Thread(runnable);
+        thread.setName("table-retry-executor");
+        thread.setDaemon(true);
+        return thread;
+      });
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
new file mode 100644
index 0000000..df6833e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils;
+
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Utility class that contains the default set of read metrics.
+ */
+public class TableMetrics {
+
+  // Read metrics
+  public final Timer getNs;
+  public final Timer getAllNs;
+  public final Counter numGets;
+  public final Counter numGetAlls;
+  public final Counter numMissedLookups;
+  // Write metrics
+  public final Counter numPuts;
+  public final Timer putNs;
+  public final Counter numPutAlls;
+  public final Timer putAllNs;
+  public final Counter numDeletes;
+  public final Timer deleteNs;
+  public final Counter numDeleteAlls;
+  public final Timer deleteAllNs;
+  public final Counter numFlushes;
+  public final Timer flushNs;
+
+  /**
+   * Constructor based on container and task container context
+   *
+   * @param context {@link Context} for this task
+   * @param table underlying table
+   * @param tableId table Id
+   */
+  public TableMetrics(Context context, Table table, String tableId) {
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
+    // Read metrics
+    numGets = tableMetricsUtil.newCounter("num-gets");
+    getNs = tableMetricsUtil.newTimer("get-ns");
+    numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
+    getAllNs = tableMetricsUtil.newTimer("getAll-ns");
+    numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
+    // Write metrics
+    numPuts = tableMetricsUtil.newCounter("num-puts");
+    putNs = tableMetricsUtil.newTimer("put-ns");
+    numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
+    putAllNs = tableMetricsUtil.newTimer("putAll-ns");
+    numDeletes = tableMetricsUtil.newCounter("num-deletes");
+    deleteNs = tableMetricsUtil.newTimer("delete-ns");
+    numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
+    deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
+    numFlushes = tableMetricsUtil.newCounter("num-flushes");
+    flushNs = tableMetricsUtil.newTimer("flush-ns");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
deleted file mode 100644
index e77fcfd..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-/**
- * Utility class that contains the default set of read metrics.
- */
-public class TableReadMetrics {
-
-  public final Timer getNs;
-  public final Timer getAllNs;
-  public final Counter numGets;
-  public final Counter numGetAlls;
-  public final Counter numMissedLookups;
-
-  /**
-   * Constructor based on container and task container context
-   *
-   * @param context {@link Context} for this task
-   * @param table underlying table
-   * @param tableId table Id
-   */
-  public TableReadMetrics(Context context, Table table, String tableId) {
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
-    numGets = tableMetricsUtil.newCounter("num-gets");
-    getNs = tableMetricsUtil.newTimer("get-ns");
-    numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
-    getAllNs = tableMetricsUtil.newTimer("getAll-ns");
-    numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
deleted file mode 100644
index bf65b74..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-public class TableWriteMetrics {
-
-  public final Counter numPuts;
-  public final Timer putNs;
-  public final Counter numPutAlls;
-  public final Timer putAllNs;
-  public final Counter numDeletes;
-  public final Timer deleteNs;
-  public final Counter numDeleteAlls;
-  public final Timer deleteAllNs;
-  public final Counter numFlushes;
-  public final Timer flushNs;
-
-  /**
-   * Utility class that contains the default set of write metrics.
-   *
-   * @param context {@link Context} for this task
-   * @param table underlying table
-   * @param tableId table Id
-   */
-  public TableWriteMetrics(Context context, Table table, String tableId) {
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
-    numPuts = tableMetricsUtil.newCounter("num-puts");
-    putNs = tableMetricsUtil.newTimer("put-ns");
-    numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
-    putAllNs = tableMetricsUtil.newTimer("putAll-ns");
-    numDeletes = tableMetricsUtil.newCounter("num-deletes");
-    deleteNs = tableMetricsUtil.newTimer("delete-ns");
-    numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
-    deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
-    numFlushes = tableMetricsUtil.newCounter("num-flushes");
-    flushNs = tableMetricsUtil.newTimer("flush-ns");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
index 4112c8b..8fd161b 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -26,7 +26,7 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
@@ -71,7 +71,7 @@ public class TestStreamTableJoinOperatorImpl {
             return record.getKey();
           }
         });
-    ReadableTable table = mock(ReadableTable.class);
+    ReadWriteTable table = mock(ReadWriteTable.class);
     when(table.get("1")).thenReturn("r1");
     when(table.get("2")).thenReturn(null);
     Context context = new MockContext();

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index a3b1963..e60b6ff 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -49,12 +49,12 @@ public class TestTableManager {
 
   public static class DummyTableProviderFactory implements TableProviderFactory {
 
-    static ReadableTable table;
+    static ReadWriteTable table;
     static TableProvider tableProvider;
 
     @Override
     public TableProvider getTableProvider(String tableId) {
-      table = mock(ReadableTable.class);
+      table = mock(ReadWriteTable.class);
       tableProvider = mock(TableProvider.class);
       when(tableProvider.getTable()).thenReturn(table);
       return tableProvider;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index 5a19767..c304bfd 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -35,11 +35,10 @@ import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteTable;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
@@ -138,11 +137,11 @@ public class TestCachingTable {
     return Pair.of(cacheTable, cacheStore);
   }
 
-  private void initTables(ReadableTable ... tables) {
+  private void initTables(ReadWriteTable ... tables) {
     initTables(false, tables);
   }
 
-  private void initTables(boolean isTimerMetricsDisabled, ReadableTable ... tables) {
+  private void initTables(boolean isTimerMetricsDisabled, ReadWriteTable ... tables) {
     Map<String, String> config = new HashMap<>();
     if (isTimerMetricsDisabled) {
       config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
@@ -242,7 +241,7 @@ public class TestCachingTable {
 
   @Test
   public void testNonexistentKeyInTable() {
-    ReadableTable<String, String> table = mock(ReadableTable.class);
+    ReadWriteTable<String, String> table = mock(ReadWriteTable.class);
     doReturn(CompletableFuture.completedFuture(null)).when(table).getAsync(any());
     ReadWriteTable<String, String> cache = getMockCache().getLeft();
     CachingTable<String, String> cachingTable = new CachingTable<>("myTable", table, cache, false);
@@ -255,7 +254,7 @@ public class TestCachingTable {
 
   @Test
   public void testKeyEviction() {
-    ReadableTable<String, String> table = mock(ReadableTable.class);
+    ReadWriteTable<String, String> table = mock(ReadWriteTable.class);
     doReturn(CompletableFuture.completedFuture("3")).when(table).getAsync(any());
     ReadWriteTable<String, String> cache = mock(ReadWriteTable.class);
 
@@ -283,7 +282,7 @@ public class TestCachingTable {
     TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class);
     TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+    final RemoteTable<String, String> remoteTable = new RemoteTable<>(
         tableId + "-remote", readFn, writeFn, rateLimitHelper, rateLimitHelper,
         Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
 
@@ -402,7 +401,7 @@ public class TestCachingTable {
     doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
     doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
 
-    final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+    final RemoteTable<String, String> remoteTable = new RemoteTable<>(
         tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper,
         Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
index 486295a..a764a8b 100644
--- a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
@@ -30,7 +30,7 @@ import org.apache.samza.context.Context;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
 import org.junit.Test;
@@ -143,7 +143,7 @@ public class TestLocalTableDescriptor {
     }
 
     @Override
-    public Table getTable() {
+    public ReadWriteTable getTable() {
       throw new SamzaException("Not implemented");
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
deleted file mode 100644
index d7733a8..0000000
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import junit.framework.Assert;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.retry.RetriableReadFunction;
-import org.apache.samza.table.retry.RetriableWriteFunction;
-import org.apache.samza.table.retry.TableRetryPolicy;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-
-public class TestRemoteReadWriteTable {
-  private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
-
-  public static Context getMockContext() {
-    Context context = new MockContext();
-    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
-    doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
-    doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
-    doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
-    return context;
-  }
-
-  private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
-      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
-    return getTable(tableId, readFn, writeFn, null);
-  }
-
-  private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
-      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
-    RemoteReadableTable<K, V> table;
-
-    TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
-    TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
-    doReturn(true).when(readRateLimiter).isRateLimited();
-    doReturn(true).when(writeRateLimiter).isRateLimited();
-
-    ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
-
-    if (writeFn == null) {
-      table = new RemoteReadableTable<K, V>(tableId, readFn, readRateLimiter, tableExecutor, cbExecutor);
-    } else {
-      table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
-    }
-
-    Context context = getMockContext();
-
-    table.init(context);
-
-    return (T) table;
-  }
-
-  private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
-    String tableId = "testGet-" + sync + error + retry;
-    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
-    // Sync is backed by async so needs to mock the async method
-    CompletableFuture<String> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-      if (!retry) {
-        doReturn(future).when(readFn).getAsync(anyString());
-      } else {
-        final int [] times = new int[] {0};
-        doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
-            .when(readFn).getAsync(anyString());
-      }
-    } else {
-      future = CompletableFuture.completedFuture("bar");
-      doReturn(future).when(readFn).getAsync(anyString());
-    }
-    if (retry) {
-      doReturn(true).when(readFn).isRetriable(any());
-      TableRetryPolicy policy = new TableRetryPolicy();
-      readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
-    }
-    RemoteReadableTable<String, String> table = getTable(tableId, readFn, null);
-    Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
-    verify(table.readRateLimiter, times(1)).throttle(anyString());
-  }
-
-  @Test
-  public void testGet() throws Exception {
-    doTestGet(true, false, false);
-  }
-
-  @Test
-  public void testGetAsync() throws Exception {
-    doTestGet(false, false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testGetAsyncError() throws Exception {
-    doTestGet(false, true, false);
-  }
-
-  @Test
-  public void testGetAsyncErrorRetried() throws Exception {
-    doTestGet(false, true, true);
-  }
-
-  @Test
-  public void testGetMultipleTables() {
-    TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
-    TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
-
-    // Sync is backed by async so needs to mock the async method
-    doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
-    doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
-
-    RemoteReadableTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
-    RemoteReadableTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
-
-    CompletableFuture<String> future1 = table1.getAsync("foo1");
-    CompletableFuture<String> future2 = table2.getAsync("foo2");
-
-    CompletableFuture.allOf(future1, future2)
-        .thenAccept(u -> {
-            Assert.assertEquals(future1.join(), "bar1");
-            Assert.assertEquals(future2.join(), "bar1");
-          });
-  }
-
-  private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
-    String tableId = "testPut-" + sync + error + isDelete + retry;
-    TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
-    TableWriteFunction<String, String> writeFn = mockWriteFn;
-    CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
-    CompletableFuture<Void> failureFuture = new CompletableFuture();
-    failureFuture.completeExceptionally(new RuntimeException("Test exception"));
-    if (!error) {
-      if (isDelete) {
-        doReturn(successFuture).when(writeFn).deleteAsync(any());
-      } else {
-        doReturn(successFuture).when(writeFn).putAsync(any(), any());
-      }
-    } else if (!retry) {
-      if (isDelete) {
-        doReturn(failureFuture).when(writeFn).deleteAsync(any());
-      } else {
-        doReturn(failureFuture).when(writeFn).putAsync(any(), any());
-      }
-    } else {
-      doReturn(true).when(writeFn).isRetriable(any());
-      final int [] times = new int[] {0};
-      if (isDelete) {
-        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
-      } else {
-        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
-      }
-      writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
-    }
-    RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
-    if (sync) {
-      table.put("foo", isDelete ? null : "bar");
-    } else {
-      table.putAsync("foo", isDelete ? null : "bar").get();
-    }
-    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
-    if (isDelete) {
-      verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
-    } else {
-      verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
-      Assert.assertEquals("bar", valCaptor.getValue());
-    }
-    Assert.assertEquals("foo", keyCaptor.getValue());
-    if (isDelete) {
-      verify(table.writeRateLimiter, times(1)).throttle(anyString());
-    } else {
-      verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
-    }
-  }
-
-  @Test
-  public void testPut() throws Exception {
-    doTestPut(true, false, false, false);
-  }
-
-  @Test
-  public void testPutDelete() throws Exception {
-    doTestPut(true, false, true, false);
-  }
-
-  @Test
-  public void testPutAsync() throws Exception {
-    doTestPut(false, false, false, false);
-  }
-
-  @Test
-  public void testPutAsyncDelete() throws Exception {
-    doTestPut(false, false, true, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testPutAsyncError() throws Exception {
-    doTestPut(false, true, false, false);
-  }
-
-  @Test
-  public void testPutAsyncErrorRetried() throws Exception {
-    doTestPut(false, true, false, true);
-  }
-
-  private void doTestDelete(boolean sync, boolean error) throws Exception {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testDelete-" + sync + error,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(null);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(writeFn).deleteAsync(any());
-    ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
-    if (sync) {
-      table.delete("foo");
-    } else {
-      table.deleteAsync("foo").get();
-    }
-    verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
-    Assert.assertEquals("foo", argCaptor.getValue());
-    verify(table.writeRateLimiter, times(1)).throttle(anyString());
-  }
-
-  @Test
-  public void testDelete() throws Exception {
-    doTestDelete(true, false);
-  }
-
-  @Test
-  public void testDeleteAsync() throws Exception {
-    doTestDelete(false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testDeleteAsyncError() throws Exception {
-    doTestDelete(false, true);
-  }
-
-  private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
-    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
-    Map<String, String> res = new HashMap<>();
-    res.put("foo1", "bar1");
-    if (!partial) {
-      res.put("foo2", "bar2");
-    }
-    CompletableFuture<Map<String, String>> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(res);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(readFn).getAllAsync(any());
-    RemoteReadableTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
-    Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
-        : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
-    verify(table.readRateLimiter, times(1)).throttle(anyCollection());
-  }
-
-  @Test
-  public void testGetAll() throws Exception {
-    doTestGetAll(true, false, false);
-  }
-
-  @Test
-  public void testGetAllAsync() throws Exception {
-    doTestGetAll(false, false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testGetAllAsyncError() throws Exception {
-    doTestGetAll(false, true, false);
-  }
-
-  // Partial result is an acceptable scenario
-  @Test
-  public void testGetAllPartialResult() throws Exception {
-    doTestGetAll(false, false, true);
-  }
-
-  public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(null);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(writeFn).putAllAsync(any());
-    if (hasDelete) {
-      doReturn(future).when(writeFn).deleteAllAsync(any());
-    }
-    List<Entry<String, String>> entries = Arrays.asList(
-        new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
-    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
-    if (sync) {
-      table.putAll(entries);
-    } else {
-      table.putAllAsync(entries).get();
-    }
-    verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
-    if (hasDelete) {
-      ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
-      verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
-      Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
-      Assert.assertEquals(1, argCaptor.getValue().size());
-      Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
-      verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
-    } else {
-      Assert.assertEquals(entries, argCaptor.getValue());
-    }
-    verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
-  }
-
-  @Test
-  public void testPutAll() throws Exception {
-    doTestPutAll(true, false, false);
-  }
-
-  @Test
-  public void testPutAllHasDelete() throws Exception {
-    doTestPutAll(true, false, true);
-  }
-
-  @Test
-  public void testPutAllAsync() throws Exception {
-    doTestPutAll(false, false, false);
-  }
-
-  @Test
-  public void testPutAllAsyncHasDelete() throws Exception {
-    doTestPutAll(false, false, true);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testPutAllAsyncError() throws Exception {
-    doTestPutAll(false, true, false);
-  }
-
-  public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(null);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(writeFn).deleteAllAsync(any());
-    List<String> keys = Arrays.asList("foo1", "foo2");
-    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
-    if (sync) {
-      table.deleteAll(keys);
-    } else {
-      table.deleteAllAsync(keys).get();
-    }
-    verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
-    Assert.assertEquals(keys, argCaptor.getValue());
-    verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
-  }
-
-  @Test
-  public void testDeleteAll() throws Exception {
-    doTestDeleteAll(true, false);
-  }
-
-  @Test
-  public void testDeleteAllAsync() throws Exception {
-    doTestDeleteAll(false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testDeleteAllAsyncError() throws Exception {
-    doTestDeleteAll(false, true);
-  }
-
-  @Test
-  public void testFlush() {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
-    table.flush();
-    verify(writeFn, times(1)).flush();
-  }
-
-  @Test
-  public void testGetWithCallbackExecutor() throws Exception {
-    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
-    // Sync is backed by async so needs to mock the async method
-    doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
-    RemoteReadableTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
-        Executors.newSingleThreadExecutor());
-    Thread testThread = Thread.currentThread();
-
-    table.getAsync("foo").thenAccept(result -> {
-        Assert.assertEquals("bar", result);
-        // Must be executed on the executor thread
-        Assert.assertNotSame(testThread, Thread.currentThread());
-      });
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
new file mode 100644
index 0000000..ae96d86
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestRemoteTable {
+  private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
+
+  public static Context getMockContext() {
+    Context context = new MockContext();
+    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+    doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
+    doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
+    doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
+    doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
+    return context;
+  }
+
+  private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
+      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
+    return getTable(tableId, readFn, writeFn, null);
+  }
+
+  private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
+      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
+    RemoteTable<K, V> table;
+
+    TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
+    TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
+    doReturn(true).when(readRateLimiter).isRateLimited();
+    doReturn(true).when(writeRateLimiter).isRateLimited();
+
+    ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
+
+    table = new RemoteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
+
+    Context context = getMockContext();
+
+    table.init(context);
+
+    return (T) table;
+  }
+
+  private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
+    String tableId = "testGet-" + sync + error + retry;
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    // Sync is backed by async so needs to mock the async method
+    CompletableFuture<String> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+      if (!retry) {
+        doReturn(future).when(readFn).getAsync(anyString());
+      } else {
+        final int [] times = new int[] {0};
+        doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
+            .when(readFn).getAsync(anyString());
+      }
+    } else {
+      future = CompletableFuture.completedFuture("bar");
+      doReturn(future).when(readFn).getAsync(anyString());
+    }
+    if (retry) {
+      doReturn(true).when(readFn).isRetriable(any());
+      TableRetryPolicy policy = new TableRetryPolicy();
+      readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
+    }
+    RemoteTable<String, String> table = getTable(tableId, readFn, null);
+    Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
+    verify(table.readRateLimiter, times(1)).throttle(anyString());
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    doTestGet(true, false, false);
+  }
+
+  @Test
+  public void testGetAsync() throws Exception {
+    doTestGet(false, false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testGetAsyncError() throws Exception {
+    doTestGet(false, true, false);
+  }
+
+  @Test
+  public void testGetAsyncErrorRetried() throws Exception {
+    doTestGet(false, true, true);
+  }
+
+  @Test
+  public void testGetMultipleTables() {
+    TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
+    TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
+
+    // Sync is backed by async so needs to mock the async method
+    doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
+    doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
+
+    RemoteTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
+    RemoteTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
+
+    CompletableFuture<String> future1 = table1.getAsync("foo1");
+    CompletableFuture<String> future2 = table2.getAsync("foo2");
+
+    CompletableFuture.allOf(future1, future2)
+        .thenAccept(u -> {
+            Assert.assertEquals(future1.join(), "bar1");
+            Assert.assertEquals(future2.join(), "bar1");
+          });
+  }
+
+  private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
+    String tableId = "testPut-" + sync + error + isDelete + retry;
+    TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
+    TableWriteFunction<String, String> writeFn = mockWriteFn;
+    CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
+    CompletableFuture<Void> failureFuture = new CompletableFuture();
+    failureFuture.completeExceptionally(new RuntimeException("Test exception"));
+    if (!error) {
+      if (isDelete) {
+        doReturn(successFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doReturn(successFuture).when(writeFn).putAsync(any(), any());
+      }
+    } else if (!retry) {
+      if (isDelete) {
+        doReturn(failureFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doReturn(failureFuture).when(writeFn).putAsync(any(), any());
+      }
+    } else {
+      doReturn(true).when(writeFn).isRetriable(any());
+      final int [] times = new int[] {0};
+      if (isDelete) {
+        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
+      }
+      writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
+    }
+    RemoteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
+    if (sync) {
+      table.put("foo", isDelete ? null : "bar");
+    } else {
+      table.putAsync("foo", isDelete ? null : "bar").get();
+    }
+    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
+    if (isDelete) {
+      verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
+    } else {
+      verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
+      Assert.assertEquals("bar", valCaptor.getValue());
+    }
+    Assert.assertEquals("foo", keyCaptor.getValue());
+    if (isDelete) {
+      verify(table.writeRateLimiter, times(1)).throttle(anyString());
+    } else {
+      verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
+    }
+  }
+
+  @Test
+  public void testPut() throws Exception {
+    doTestPut(true, false, false, false);
+  }
+
+  @Test
+  public void testPutDelete() throws Exception {
+    doTestPut(true, false, true, false);
+  }
+
+  @Test
+  public void testPutAsync() throws Exception {
+    doTestPut(false, false, false, false);
+  }
+
+  @Test
+  public void testPutAsyncDelete() throws Exception {
+    doTestPut(false, false, true, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testPutAsyncError() throws Exception {
+    doTestPut(false, true, false, false);
+  }
+
+  @Test
+  public void testPutAsyncErrorRetried() throws Exception {
+    doTestPut(false, true, false, true);
+  }
+
+  private void doTestDelete(boolean sync, boolean error) throws Exception {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteTable<String, String> table = getTable("testDelete-" + sync + error,
+        mock(TableReadFunction.class), writeFn);
+    CompletableFuture<Void> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(null);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(writeFn).deleteAsync(any());
+    ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
+    if (sync) {
+      table.delete("foo");
+    } else {
+      table.deleteAsync("foo").get();
+    }
+    verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
+    Assert.assertEquals("foo", argCaptor.getValue());
+    verify(table.writeRateLimiter, times(1)).throttle(anyString());
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    doTestDelete(true, false);
+  }
+
+  @Test
+  public void testDeleteAsync() throws Exception {
+    doTestDelete(false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testDeleteAsyncError() throws Exception {
+    doTestDelete(false, true);
+  }
+
+  private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    Map<String, String> res = new HashMap<>();
+    res.put("foo1", "bar1");
+    if (!partial) {
+      res.put("foo2", "bar2");
+    }
+    CompletableFuture<Map<String, String>> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(res);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(readFn).getAllAsync(any());
+    RemoteTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
+    Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
+        : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
+    verify(table.readRateLimiter, times(1)).throttle(anyCollection());
+  }
+
+  @Test
+  public void testGetAll() throws Exception {
+    doTestGetAll(true, false, false);
+  }
+
+  @Test
+  public void testGetAllAsync() throws Exception {
+    doTestGetAll(false, false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testGetAllAsyncError() throws Exception {
+    doTestGetAll(false, true, false);
+  }
+
+  // Partial result is an acceptable scenario
+  @Test
+  public void testGetAllPartialResult() throws Exception {
+    doTestGetAll(false, false, true);
+  }
+
+  public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
+        mock(TableReadFunction.class), writeFn);
+    CompletableFuture<Void> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(null);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(writeFn).putAllAsync(any());
+    if (hasDelete) {
+      doReturn(future).when(writeFn).deleteAllAsync(any());
+    }
+    List<Entry<String, String>> entries = Arrays.asList(
+        new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
+    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
+    if (sync) {
+      table.putAll(entries);
+    } else {
+      table.putAllAsync(entries).get();
+    }
+    verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
+    if (hasDelete) {
+      ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
+      verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
+      Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
+      Assert.assertEquals(1, argCaptor.getValue().size());
+      Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
+      verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
+    } else {
+      Assert.assertEquals(entries, argCaptor.getValue());
+    }
+    verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
+  }
+
+  @Test
+  public void testPutAll() throws Exception {
+    doTestPutAll(true, false, false);
+  }
+
+  @Test
+  public void testPutAllHasDelete() throws Exception {
+    doTestPutAll(true, false, true);
+  }
+
+  @Test
+  public void testPutAllAsync() throws Exception {
+    doTestPutAll(false, false, false);
+  }
+
+  @Test
+  public void testPutAllAsyncHasDelete() throws Exception {
+    doTestPutAll(false, false, true);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testPutAllAsyncError() throws Exception {
+    doTestPutAll(false, true, false);
+  }
+
+  public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
+        mock(TableReadFunction.class), writeFn);
+    CompletableFuture<Void> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(null);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(writeFn).deleteAllAsync(any());
+    List<String> keys = Arrays.asList("foo1", "foo2");
+    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
+    if (sync) {
+      table.deleteAll(keys);
+    } else {
+      table.deleteAllAsync(keys).get();
+    }
+    verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
+    Assert.assertEquals(keys, argCaptor.getValue());
+    verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
+  }
+
+  @Test
+  public void testDeleteAll() throws Exception {
+    doTestDeleteAll(true, false);
+  }
+
+  @Test
+  public void testDeleteAllAsync() throws Exception {
+    doTestDeleteAll(false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testDeleteAllAsyncError() throws Exception {
+    doTestDeleteAll(false, true);
+  }
+
+  @Test
+  public void testFlush() {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
+    table.flush();
+    verify(writeFn, times(1)).flush();
+  }
+
+  @Test
+  public void testGetWithCallbackExecutor() throws Exception {
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    // Sync is backed by async so needs to mock the async method
+    doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
+    RemoteTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
+        Executors.newSingleThreadExecutor());
+    Thread testThread = Thread.currentThread();
+
+    table.getAsync("foo").thenAccept(result -> {
+        Assert.assertEquals("bar", result);
+        // Must be executed on the executor thread
+        Assert.assertNotSame(testThread, Thread.currentThread());
+      });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index 3d8e36f..907242f 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -36,17 +36,17 @@ import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteTable;
 import org.apache.samza.table.remote.RemoteTableProvider;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-
 import org.apache.samza.table.retry.RetriableReadFunction;
 import org.apache.samza.table.retry.RetriableWriteFunction;
 import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.util.EmbeddedTaggedRateLimiter;
 import org.apache.samza.util.RateLimiter;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -207,8 +207,8 @@ public class TestRemoteTableDescriptor {
     RemoteTableProvider provider = new RemoteTableProvider(desc.getTableId());
     provider.init(createMockContext(desc));
     Table table = provider.getTable();
-    Assert.assertTrue(table instanceof RemoteReadWriteTable);
-    RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;
+    Assert.assertTrue(table instanceof RemoteTable);
+    RemoteTable rwTable = (RemoteTable) table;
     if (numRateLimitOps > 0) {
       Assert.assertTrue(!rlGets || rwTable.getReadRateLimiter() != null);
       Assert.assertTrue(!rlPuts || rwTable.getWriteRateLimiter() != null);

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
index 5116cab..050ea55 100644
--- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
@@ -35,7 +35,7 @@ import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.table.remote.TestRemoteReadWriteTable;
+import org.apache.samza.table.remote.TestRemoteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 import org.junit.Test;
 
@@ -54,7 +54,7 @@ public class TestRetriableTableFunctions {
 
   public TableMetricsUtil getMetricsUtil(String tableId) {
     Table table = mock(Table.class);
-    Context context = TestRemoteReadWriteTable.getMockContext();
+    Context context = TestRemoteTable.getMockContext();
     return new TableMetricsUtil(context, table, tableId);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
index ddb79ba..fc9ce76 100644
--- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
@@ -19,7 +19,7 @@
 package org.apache.samza.storage.kv.inmemory;
 
 import java.util.Map;
-import junit.framework.Assert;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
@@ -28,7 +28,9 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.storage.kv.LocalTableProviderFactory;
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+
 import org.junit.Test;
+import org.junit.Assert;
 
 
 public class TestInMemoryTableDescriptor {

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
index 62fb3da..319fb0f 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -19,7 +19,7 @@
 package org.apache.samza.storage.kv.descriptors;
 
 import java.util.Map;
-import junit.framework.Assert;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
@@ -30,7 +30,9 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.LocalTableProviderFactory;
 import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+
 import org.junit.Test;
+import org.junit.Assert;
 
 public class TestRocksDbTableDescriptor {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
deleted file mode 100644
index eae6bb0..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadWriteTable;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-
-/**
- * A store backed readable and writable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
-    implements ReadWriteTable<K, V> {
-
-  /**
-   * Constructs an instance of {@link LocalReadWriteTable}
-   * @param tableId the table Id
-   * @param kvStore the backing store
-   */
-  public LocalReadWriteTable(String tableId, KeyValueStore kvStore) {
-    super(tableId, kvStore);
-  }
-
-  @Override
-  public void put(K key, V value) {
-    if (value != null) {
-      instrument(writeMetrics.numPuts, writeMetrics.putNs, () -> kvStore.put(key, value));
-    } else {
-      delete(key);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> putAsync(K key, V value) {
-    CompletableFuture<Void> future = new CompletableFuture();
-    try {
-      put(key, value);
-      future.complete(null);
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void putAll(List<Entry<K, V>> entries) {
-    List<Entry<K, V>> toPut = new LinkedList<>();
-    List<K> toDelete = new LinkedList<>();
-    entries.forEach(e -> {
-        if (e.getValue() != null) {
-          toPut.add(e);
-        } else {
-          toDelete.add(e.getKey());
-        }
-      });
-
-    if (!toPut.isEmpty()) {
-      instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(toPut));
-    }
-
-    if (!toDelete.isEmpty()) {
-      deleteAll(toDelete);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
-    CompletableFuture<Void> future = new CompletableFuture();
-    try {
-      putAll(entries);
-      future.complete(null);
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void delete(K key) {
-    instrument(writeMetrics.numDeletes, writeMetrics.deleteNs, () -> kvStore.delete(key));
-  }
-
-  @Override
-  public CompletableFuture<Void> deleteAsync(K key) {
-    CompletableFuture<Void> future = new CompletableFuture();
-    try {
-      delete(key);
-      future.complete(null);
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void deleteAll(List<K> keys) {
-    instrument(writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs, () -> kvStore.deleteAll(keys));
-  }
-
-  @Override
-  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
-    CompletableFuture<Void> future = new CompletableFuture();
-    try {
-      deleteAll(keys);
-      future.complete(null);
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void flush() {
-    instrument(writeMetrics.numFlushes, writeMetrics.flushNs, () -> kvStore.flush());
-  }
-
-  private interface Func0 {
-    void apply();
-  }
-
-  private void instrument(Counter counter, Timer timer, Func0 func) {
-    incCounter(counter);
-    long startNs = clock.nanoTime();
-    func.apply();
-    updateTimer(timer, clock.nanoTime() - startNs);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
deleted file mode 100644
index 29ddb15..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import com.google.common.base.Preconditions;
-
-import com.google.common.base.Supplier;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.BaseReadableTable;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-/**
- * A store backed readable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> {
-
-  protected final KeyValueStore<K, V> kvStore;
-
-  /**
-   * Constructs an instance of {@link LocalReadableTable}
-   * @param tableId the table Id
-   * @param kvStore the backing store
-   */
-  public LocalReadableTable(String tableId, KeyValueStore<K, V> kvStore) {
-    super(tableId);
-    Preconditions.checkNotNull(kvStore, "null KeyValueStore");
-    this.kvStore = kvStore;
-  }
-
-  @Override
-  public V get(K key) {
-    V result = instrument(readMetrics.numGets, readMetrics.getNs, () -> kvStore.get(key));
-    if (result == null) {
-      incCounter(readMetrics.numMissedLookups);
-    }
-    return result;
-  }
-
-  @Override
-  public CompletableFuture<V> getAsync(K key) {
-    CompletableFuture<V> future = new CompletableFuture();
-    try {
-      future.complete(get(key));
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public Map<K, V> getAll(List<K> keys) {
-    Map<K, V> result = instrument(readMetrics.numGetAlls, readMetrics.getAllNs, () -> kvStore.getAll(keys));
-    result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
-    return result;
-  }
-
-  @Override
-  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
-    CompletableFuture<Map<K, V>> future = new CompletableFuture();
-    try {
-      future.complete(getAll(keys));
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void close() {
-    // The KV store is not closed here as it may still be needed by downstream operators,
-    // it will be closed by the SamzaContainer
-  }
-
-  private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
-    incCounter(counter);
-    long startNs = clock.nanoTime();
-    T result = func.get();
-    updateTimer(timer, clock.nanoTime() - startNs);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
new file mode 100644
index 0000000..d9767b6
--- /dev/null
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.BaseReadWriteTable;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
+
+/**
+ * A store backed readable and writable table
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class LocalTable<K, V> extends BaseReadWriteTable<K, V> {
+
+  protected final KeyValueStore<K, V> kvStore;
+
+  /**
+   * Constructs an instance of {@link LocalTable}
+   * @param tableId the table Id
+   * @param kvStore the backing store
+   */
+  public LocalTable(String tableId, KeyValueStore kvStore) {
+    super(tableId);
+    Preconditions.checkNotNull(kvStore, "null KeyValueStore");
+    this.kvStore = kvStore;
+  }
+
+  @Override
+  public V get(K key) {
+    V result = instrument(metrics.numGets, metrics.getNs, () -> kvStore.get(key));
+    if (result == null) {
+      incCounter(metrics.numMissedLookups);
+    }
+    return result;
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key) {
+    CompletableFuture<V> future = new CompletableFuture();
+    try {
+      future.complete(get(key));
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public Map<K, V> getAll(List<K> keys) {
+    Map<K, V> result = instrument(metrics.numGetAlls, metrics.getAllNs, () -> kvStore.getAll(keys));
+    result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups));
+    return result;
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+    CompletableFuture<Map<K, V>> future = new CompletableFuture();
+    try {
+      future.complete(getAll(keys));
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public void put(K key, V value) {
+    if (value != null) {
+      instrument(metrics.numPuts, metrics.putNs, () -> kvStore.put(key, value));
+    } else {
+      delete(key);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V value) {
+    CompletableFuture<Void> future = new CompletableFuture();
+    try {
+      put(key, value);
+      future.complete(null);
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public void putAll(List<Entry<K, V>> entries) {
+    List<Entry<K, V>> toPut = new LinkedList<>();
+    List<K> toDelete = new LinkedList<>();
+    entries.forEach(e -> {
+        if (e.getValue() != null) {
+          toPut.add(e);
+        } else {
+          toDelete.add(e.getKey());
+        }
+      });
+
+    if (!toPut.isEmpty()) {
+      instrument(metrics.numPutAlls, metrics.putAllNs, () -> kvStore.putAll(toPut));
+    }
+
+    if (!toDelete.isEmpty()) {
+      deleteAll(toDelete);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
+    CompletableFuture<Void> future = new CompletableFuture();
+    try {
+      putAll(entries);
+      future.complete(null);
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public void delete(K key) {
+    instrument(metrics.numDeletes, metrics.deleteNs, () -> kvStore.delete(key));
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key) {
+    CompletableFuture<Void> future = new CompletableFuture();
+    try {
+      delete(key);
+      future.complete(null);
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public void deleteAll(List<K> keys) {
+    instrument(metrics.numDeleteAlls, metrics.deleteAllNs, () -> kvStore.deleteAll(keys));
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+    CompletableFuture<Void> future = new CompletableFuture();
+    try {
+      deleteAll(keys);
+      future.complete(null);
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public void flush() {
+    instrument(metrics.numFlushes, metrics.flushNs, () -> kvStore.flush());
+  }
+
+  @Override
+  public void close() {
+    // The KV store is not closed here as it may still be needed by downstream operators,
+    // it will be closed by the SamzaContainer
+  }
+
+  private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
+    incCounter(counter);
+    long startNs = clock.nanoTime();
+    T result = func.get();
+    updateTimer(timer, clock.nanoTime() - startNs);
+    return result;
+  }
+
+  private interface Func0 {
+    void apply();
+  }
+
+  private void instrument(Counter counter, Timer timer, Func0 func) {
+    incCounter(counter);
+    long startNs = clock.nanoTime();
+    func.apply();
+    updateTimer(timer, clock.nanoTime() - startNs);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
index 3be61d0..5099a7e 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
@@ -20,8 +20,7 @@ package org.apache.samza.storage.kv;
 
 import com.google.common.base.Preconditions;
 import org.apache.samza.context.Context;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.BaseTableProvider;
 
 /**
@@ -53,10 +52,10 @@ public class LocalTableProvider extends BaseTableProvider {
   }
 
   @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
     Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
     Preconditions.checkNotNull(kvStore, "Store not initialized for table " + tableId);
-    ReadableTable table = new LocalReadWriteTable(tableId, kvStore);
+    ReadWriteTable table = new LocalTable(tableId, kvStore);
     table.init(this.context);
     return table;
   }