You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/12/17 23:11:34 UTC

[1/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and ReadWriteTable

Repository: samza
Updated Branches:
  refs/heads/master c5348bf6b -> 6a75503d7


http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
new file mode 100644
index 0000000..e38af39
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import com.google.common.cache.CacheBuilder;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.KV;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
+import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.util.RateLimiter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.generatePageViews;
+import static org.apache.samza.test.table.TestTableData.generateProfiles;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.withSettings;
+
+
+public class TestRemoteTableEndToEnd extends AbstractIntegrationTestHarness {
+
+  static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
+
+  static class InMemoryReadFunction implements TableReadFunction<Integer, Profile> {
+    private final String serializedProfiles;
+    private transient Map<Integer, Profile> profileMap;
+
+    private InMemoryReadFunction(String profiles) {
+      this.serializedProfiles = profiles;
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+      Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
+      this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
+    }
+
+    @Override
+    public CompletableFuture<Profile> getAsync(Integer key) {
+      return CompletableFuture.completedFuture(profileMap.get(key));
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+
+    static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) {
+      return new InMemoryReadFunction(serializedProfiles);
+    }
+  }
+
+  static class InMemoryWriteFunction implements TableWriteFunction<Integer, EnrichedPageView> {
+    private transient List<EnrichedPageView> records;
+    private String testName;
+
+    public InMemoryWriteFunction(String testName) {
+      this.testName = testName;
+    }
+
+    // Verify serializable functionality
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+
+      // Write to the global list for verification
+      records = writtenRecords.get(testName);
+    }
+
+    @Override
+    public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView record) {
+      records.add(record);
+      return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteAsync(Integer key) {
+      records.remove(key);
+      return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
+  private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
+    CachingTableDescriptor<K, V> cachingDesc;
+    if (defaultCache) {
+      cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
+      cachingDesc.withReadTtl(Duration.ofMinutes(5));
+      cachingDesc.withWriteTtl(Duration.ofMinutes(5));
+    } else {
+      GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
+      guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
+      cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
+    }
+
+    return appDesc.getTable(cachingDesc);
+  }
+
+  static class MyReadFunction implements TableReadFunction {
+    @Override
+    public CompletableFuture getAsync(Object key) {
+      return null;
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
+  private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception {
+    final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
+
+    writtenRecords.put(testName, new ArrayList<>());
+
+    int count = 10;
+    PageView[] pageViews = generatePageViews(count);
+    String profiles = Base64Serializer.serialize(generateProfiles(count));
+
+    int partitionCount = 4;
+    Map<String, String> configs = TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+
+    final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable());
+    final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable());
+    final StreamApplication app = appDesc -> {
+      RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
+      inputTableDesc
+          .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
+          .withRateLimiter(readRateLimiter, null, null);
+
+      // dummy reader
+      TableReadFunction readFn = new MyReadFunction();
+
+      RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
+      outputTableDesc
+          .withReadFunction(readFn)
+          .withWriteFunction(writer)
+          .withRateLimiter(writeRateLimiter, null, null);
+
+      Table<KV<Integer, EnrichedPageView>> outputTable = withCache
+          ? getCachingTable(outputTableDesc, defaultCache, "output", appDesc)
+          : appDesc.getTable(outputTableDesc);
+
+      Table<KV<Integer, Profile>> inputTable = withCache
+          ? getCachingTable(inputTableDesc, defaultCache, "input", appDesc)
+          : appDesc.getTable(inputTableDesc);
+
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDesc.getInputStream(isd)
+          .map(pv -> new KV<>(pv.getMemberId(), pv))
+          .join(inputTable, new PageViewToProfileJoinFunction())
+          .map(m -> new KV(m.getMemberId(), m))
+          .sendTo(outputTable);
+    };
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+
+    int numExpected = count * partitionCount;
+    Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
+    Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof EnrichedPageView);
+  }
+
+  @Test
+  public void testStreamTableJoinRemoteTable() throws Exception {
+    doTestStreamTableJoinRemoteTable(false, false, "testStreamTableJoinRemoteTable");
+  }
+
+  @Test
+  public void testStreamTableJoinRemoteTableWithCache() throws Exception {
+    doTestStreamTableJoinRemoteTable(true, false, "testStreamTableJoinRemoteTableWithCache");
+  }
+
+  @Test
+  public void testStreamTableJoinRemoteTableWithDefaultCache() throws Exception {
+    doTestStreamTableJoinRemoteTable(true, true, "testStreamTableJoinRemoteTableWithDefaultCache");
+  }
+
+  private Context createMockContext() {
+    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+    doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
+    doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
+    Context context = new MockContext();
+    doReturn(new MapConfig()).when(context.getJobContext()).getConfig();
+    doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
+    return context;
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testCatchReaderException() {
+    TableReadFunction<String, ?> reader = mock(TableReadFunction.class);
+    CompletableFuture<String> future = new CompletableFuture<>();
+    future.completeExceptionally(new RuntimeException("Expected test exception"));
+    doReturn(future).when(reader).getAsync(anyString());
+    TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
+    RemoteTable<String, String> table = new RemoteTable<>("table1", reader, null,
+        rateLimitHelper, null, Executors.newSingleThreadExecutor(), null);
+    table.init(createMockContext());
+    table.get("abc");
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testCatchWriterException() {
+    TableReadFunction<String, String> reader = mock(TableReadFunction.class);
+    TableWriteFunction<String, String> writer = mock(TableWriteFunction.class);
+    CompletableFuture<String> future = new CompletableFuture<>();
+    future.completeExceptionally(new RuntimeException("Expected test exception"));
+    doReturn(future).when(writer).putAsync(anyString(), any());
+    TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
+    RemoteTable<String, String> table = new RemoteTable<String, String>(
+        "table1", reader, writer, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
+    table.init(createMockContext());
+    table.put("abc", "efg");
+  }
+
+  @Test
+  public void testUninitializedWriter() {
+    TableReadFunction<String, String> reader = mock(TableReadFunction.class);
+    TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
+    RemoteTable<String, String> table = new RemoteTable<String, String>(
+        "table1", reader, null, rateLimitHelper, null, Executors.newSingleThreadExecutor(), null);
+    table.init(createMockContext());
+    int failureCount = 0;
+    try {
+      table.put("abc", "efg");
+    } catch (SamzaException ex) {
+      ++failureCount;
+    }
+    try {
+      table.delete("abc");
+    } catch (SamzaException ex) {
+      ++failureCount;
+    }
+    table.flush();
+    table.close();
+    Assert.assertEquals(2, failureCount);
+  }
+}


[4/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and ReadWriteTable

Posted by we...@apache.org.
SAMZA-2043: Consolidate ReadableTable and ReadWriteTable

So far we've not seen a lot of use in maintaining separate implementation for ReadableTable and ReadWriteTable, which adds quite a bit complexity. Hence consolidating them.

Author: Wei Song <ws...@linkedin.com>

Reviewers: Xinyu Liu <xi...@linkedin.com>

Closes #861 from weisong44/SAMZA-2043


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6a75503d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6a75503d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6a75503d

Branch: refs/heads/master
Commit: 6a75503d74ae65b30e2dcf760bba6e1d8050cdba
Parents: c5348bf
Author: Wei Song <ws...@linkedin.com>
Authored: Mon Dec 17 15:11:27 2018 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Mon Dec 17 15:11:27 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/context/TaskContext.java   |  16 +-
 .../org/apache/samza/table/ReadWriteTable.java  |  53 ++-
 .../org/apache/samza/table/ReadableTable.java   |  86 ----
 .../main/java/org/apache/samza/table/Table.java |   2 -
 .../org/apache/samza/table/TableProvider.java   |   4 +-
 .../table/descriptors/BaseTableDescriptor.java  |   1 +
 .../table/descriptors/LocalTableDescriptor.java |   1 +
 .../descriptors/RemoteTableDescriptor.java      |   1 +
 .../apache/samza/table/utils/SerdeUtils.java    |   1 +
 .../table/remote/TestTableRateLimiter.java      |   2 +-
 .../apache/samza/context/TaskContextImpl.java   |   4 +-
 .../operators/impl/SendToTableOperatorImpl.java |   2 +-
 .../impl/StreamTableJoinOperatorImpl.java       |   6 +-
 .../apache/samza/table/BaseReadWriteTable.java  |  69 +++
 .../apache/samza/table/BaseReadableTable.java   |  75 ---
 .../org/apache/samza/table/TableManager.java    |   4 +-
 .../samza/table/caching/CachingTable.java       | 108 ++---
 .../table/caching/CachingTableProvider.java     |   8 +-
 .../table/caching/guava/GuavaCacheTable.java    |   4 +-
 .../caching/guava/GuavaCacheTableProvider.java  |   4 +-
 .../table/remote/RemoteReadWriteTable.java      | 244 ----------
 .../samza/table/remote/RemoteReadableTable.java | 246 ----------
 .../apache/samza/table/remote/RemoteTable.java  | 436 ++++++++++++++++++
 .../samza/table/remote/RemoteTableProvider.java |  68 ++-
 .../apache/samza/table/utils/TableMetrics.java  |  77 ++++
 .../samza/table/utils/TableReadMetrics.java     |  54 ---
 .../samza/table/utils/TableWriteMetrics.java    |  60 ---
 .../impl/TestStreamTableJoinOperatorImpl.java   |   4 +-
 .../apache/samza/table/TestTableManager.java    |   4 +-
 .../samza/table/caching/TestCachingTable.java   |  15 +-
 .../descriptors/TestLocalTableDescriptor.java   |   4 +-
 .../table/remote/TestRemoteReadWriteTable.java  | 458 -------------------
 .../samza/table/remote/TestRemoteTable.java     | 456 ++++++++++++++++++
 .../descriptors/TestRemoteTableDescriptor.java  |   8 +-
 .../retry/TestRetriableTableFunctions.java      |   4 +-
 .../inmemory/TestInMemoryTableDescriptor.java   |   4 +-
 .../descriptors/TestRocksDbTableDescriptor.java |   4 +-
 .../samza/storage/kv/LocalReadWriteTable.java   | 154 -------
 .../samza/storage/kv/LocalReadableTable.java    | 108 -----
 .../org/apache/samza/storage/kv/LocalTable.java | 213 +++++++++
 .../samza/storage/kv/LocalTableProvider.java    |   7 +-
 .../storage/kv/TestLocalReadWriteTable.java     | 247 ----------
 .../storage/kv/TestLocalReadableTable.java      | 155 -------
 .../storage/kv/TestLocalTableProvider.java      |   2 +-
 .../samza/storage/kv/TestLocalTableRead.java    | 155 +++++++
 .../samza/storage/kv/TestLocalTableWrite.java   | 247 ++++++++++
 .../framework/StreamTaskIntegrationTest.java    |   2 +-
 .../apache/samza/test/table/TestLocalTable.java | 362 ---------------
 .../test/table/TestLocalTableEndToEnd.java      | 361 +++++++++++++++
 .../samza/test/table/TestRemoteTable.java       | 288 ------------
 .../test/table/TestRemoteTableEndToEnd.java     | 310 +++++++++++++
 51 files changed, 2509 insertions(+), 2699 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
index cdf7404..8adfcea 100644
--- a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
@@ -25,8 +25,6 @@ import org.apache.samza.scheduler.CallbackScheduler;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
 
 
 /**
@@ -63,17 +61,15 @@ public interface TaskContext {
   KeyValueStore<?, ?> getStore(String storeName);
 
   /**
-   * Gets the {@link Table} corresponding to the {@code tableId} for this task.
+   * Gets the {@link ReadWriteTable} corresponding to the {@code tableId} for this task.
    *
-   * The returned table should be cast with the concrete type parameters based on the configured table serdes, and
-   * whether it is {@link ReadWriteTable} or {@link ReadableTable}. E.g., if using string key and integer value
-   * serde for a writable table, it should be cast to a {@code ReadWriteTable<String, Integer>}.
-   *
-   * @param tableId id of the {@link Table} to get
-   * @return the {@link Table} associated with {@code tableId} for this task
+   * @param tableId id of the {@link ReadWriteTable} to get
+   * @param <K> the type of the key in this table
+   * @param <V> the type of the value in this table
+   * @return the {@link ReadWriteTable} associated with {@code tableId} for this task
    * @throws IllegalArgumentException if there is no table associated with {@code tableId}
    */
-  Table<?> getTable(String tableId);
+  <K, V> ReadWriteTable<K, V> getTable(String tableId);
 
   /**
    * Gets the {@link CallbackScheduler} for this task, which can be used to schedule a callback to be executed

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
index 083a1b5..ffb87a4 100644
--- a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
+++ b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
@@ -19,9 +19,11 @@
 package org.apache.samza.table;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
 
 /**
@@ -32,7 +34,51 @@ import org.apache.samza.storage.kv.Entry;
  * @param <V> the type of the value in this table
  */
 @InterfaceStability.Unstable
-public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
+public interface ReadWriteTable<K, V> extends Table {
+
+  /**
+   * Initializes the table during container initialization.
+   * Guaranteed to be invoked as the first operation on the table.
+   * @param context {@link Context} corresponding to this table
+   */
+  default void init(Context context) {
+  }
+
+  /**
+   * Gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  V get(K key);
+
+  /**
+   * Asynchronously gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return completableFuture for the requested value
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  CompletableFuture<V> getAsync(K key);
+
+  /**
+   * Gets the values with which the specified {@code keys} are associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return a map of the keys that were found and their respective values.
+   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+   */
+  Map<K, V> getAll(List<K> keys);
+
+  /**
+   * Asynchronously gets the values with which the specified {@code keys} are associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return completableFuture for the requested entries
+   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+   */
+  CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
 
   /**
    * Updates the mapping of the specified key-value pair;
@@ -114,4 +160,9 @@ public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
    * Flushes the underlying store of this table, if applicable.
    */
   void flush();
+
+  /**
+   * Close the table and release any resources acquired
+   */
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
deleted file mode 100644
index 6c88fd3..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.KV;
-
-
-/**
- *
- * A table that supports get by one or more keys
- *
- * @param <K> the type of the record key in this table
- * @param <V> the type of the record value in this table
- */
-@InterfaceStability.Unstable
-public interface ReadableTable<K, V> extends Table<KV<K, V>> {
-  /**
-   * Initializes the table during container initialization.
-   * Guaranteed to be invoked as the first operation on the table.
-   * @param context {@link Context} corresponding to this table
-   */
-  default void init(Context context) {
-  }
-
-  /**
-   * Gets the value associated with the specified {@code key}.
-   *
-   * @param key the key with which the associated value is to be fetched.
-   * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   */
-  V get(K key);
-
-  /**
-   * Asynchronously gets the value associated with the specified {@code key}.
-   *
-   * @param key the key with which the associated value is to be fetched.
-   * @return completableFuture for the requested value
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   */
-  CompletableFuture<V> getAsync(K key);
-
-  /**
-   * Gets the values with which the specified {@code keys} are associated.
-   *
-   * @param keys the keys with which the associated values are to be fetched.
-   * @return a map of the keys that were found and their respective values.
-   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-   */
-  Map<K, V> getAll(List<K> keys);
-
-  /**
-   * Asynchronously gets the values with which the specified {@code keys} are associated.
-   *
-   * @param keys the keys with which the associated values are to be fetched.
-   * @return completableFuture for the requested entries
-   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-   */
-  CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
-
-  /**
-   * Close the table and release any resources acquired
-   */
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/Table.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/Table.java b/samza-api/src/main/java/org/apache/samza/table/Table.java
index 234d15b..c454012 100644
--- a/samza-api/src/main/java/org/apache/samza/table/Table.java
+++ b/samza-api/src/main/java/org/apache/samza/table/Table.java
@@ -36,8 +36,6 @@ import org.apache.samza.task.InitableTask;
  * hybrid tables. For remote data sources, a {@code RemoteTable} provides optimized access with caching, rate-limiting,
  * and retry support.
  * <p>
- * Depending on the implementation, a {@link Table} can be a {@link ReadableTable} or a {@link ReadWriteTable}.
- * <p>
  * Use a {@link TableDescriptor} to specify the properties of a {@link Table}. For High Level API
  * {@link StreamApplication}s, use {@link StreamApplicationDescriptor#getTable} to obtain the {@link Table} instance for
  * the descriptor that can be used with the {@link MessageStream} operators like {@link MessageStream#sendTo(Table)}.

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
index 2dec989..36cad2e 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -34,10 +34,10 @@ public interface TableProvider {
   void init(Context context);
 
   /**
-   * Get an instance of the table for read/write operations
+   * Get an instance of the {@link ReadWriteTable}
    * @return the underlying table
    */
-  Table getTable();
+  ReadWriteTable getTable();
 
   /**
    * Shutdown the underlying table

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
index 26c2ae3..52eca5f 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
@@ -56,6 +56,7 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
    * @param value the value
    * @return this table descriptor instance
    */
+  @SuppressWarnings("unchecked")
   public D withConfig(String key, String value) {
     config.put(key, value);
     return (D) this;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
index 1ebb580..1623710 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -39,6 +39,7 @@ import org.apache.samza.table.utils.SerdeUtils;
  * @param <V> the type of the value in this table
  * @param <D> the type of the concrete table descriptor
  */
+@SuppressWarnings("unchecked")
 abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<K, V, D>>
     extends BaseTableDescriptor<K, V, D> {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 7286004..4b15c47 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -235,6 +235,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
     if (!tagCreditsMap.isEmpty()) {
       RateLimiter defaultRateLimiter;
       try {
+        @SuppressWarnings("unchecked")
         Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
         Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
         defaultRateLimiter = ctor.newInstance(tagCreditsMap);

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
index a7b66e5..338baf4 100644
--- a/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
+++ b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
@@ -54,6 +54,7 @@ public final class SerdeUtils {
    * @return deserialized object instance
    * @param <T> type of the object
    */
+  @SuppressWarnings("unchecked")
   public static <T> T deserialize(String name, String strObject) {
     try {
       byte [] bytes = Base64.getDecoder().decode(strObject);

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
index ea9acbd..3235d5a 100644
--- a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
+++ b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
@@ -25,9 +25,9 @@ import java.util.Collections;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
 import org.junit.Test;
 
-import junit.framework.Assert;
 
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyMap;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index ec52f8a..a29c2b3 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -26,7 +26,7 @@ import org.apache.samza.scheduler.CallbackScheduler;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.TableManager;
 
 import java.util.HashMap;
@@ -83,7 +83,7 @@ public class TaskContextImpl implements TaskContext {
   }
 
   @Override
-  public Table getTable(String tableId) {
+  public <K, V> ReadWriteTable<K, V> getTable(String tableId) {
     return this.tableManager.getTable(tableId);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index 0d39c1b..6d84b17 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -44,7 +44,7 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void>
 
   SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Context context) {
     this.sendToTableOpSpec = sendToTableOpSpec;
-    this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableId());
+    this.table = context.getTaskContext().getTable(sendToTableOpSpec.getTableId());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
index 96f07d1..e3fc266 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
@@ -22,7 +22,7 @@ import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
@@ -42,11 +42,11 @@ import java.util.Collections;
 class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M, JM> {
 
   private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec;
-  private final ReadableTable<K, ?> table;
+  private final ReadWriteTable<K, ?> table;
 
   StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, Context context) {
     this.joinOpSpec = joinOpSpec;
-    this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableId());
+    this.table = context.getTaskContext().getTable(joinOpSpec.getTableId());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
new file mode 100644
index 0000000..cef224d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import com.google.common.base.Preconditions;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.table.utils.TableMetrics;
+import org.apache.samza.util.HighResolutionClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for a concrete table implementation
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+abstract public class BaseReadWriteTable<K, V> implements ReadWriteTable<K, V> {
+
+  protected final Logger logger;
+
+  protected final String tableId;
+
+  protected TableMetrics metrics;
+
+  protected HighResolutionClock clock;
+
+  /**
+   * Construct an instance
+   * @param tableId Id of the table
+   */
+  public BaseReadWriteTable(String tableId) {
+    Preconditions.checkArgument(tableId != null & !tableId.isEmpty(),
+        String.format("Invalid table Id: %s", tableId));
+    this.tableId = tableId;
+    this.logger = LoggerFactory.getLogger(getClass().getName() + "." + tableId);
+  }
+
+  @Override
+  public void init(Context context) {
+    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+    clock = metricsConfig.getMetricsTimerEnabled()
+        ? () -> System.nanoTime()
+        : () -> 0L;
+    metrics = new TableMetrics(context, this, tableId);
+  }
+
+  public String getTableId() {
+    return tableId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
deleted file mode 100644
index 1dfd54c..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import com.google.common.base.Preconditions;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.utils.TableReadMetrics;
-import org.apache.samza.table.utils.TableWriteMetrics;
-import org.apache.samza.util.HighResolutionClock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Base class for all readable tables
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> {
-
-  protected final Logger logger;
-
-  protected final String tableId;
-
-  protected TableReadMetrics readMetrics;
-  protected TableWriteMetrics writeMetrics;
-
-  protected HighResolutionClock clock;
-
-  /**
-   * Construct an instance
-   * @param tableId Id of the table
-   */
-  public BaseReadableTable(String tableId) {
-    Preconditions.checkArgument(tableId != null & !tableId.isEmpty(),
-        String.format("Invalid table Id: %s", tableId));
-    this.tableId = tableId;
-    this.logger = LoggerFactory.getLogger(getClass().getName() + "." + tableId);
-  }
-
-  @Override
-  public void init(Context context) {
-    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
-    clock = metricsConfig.getMetricsTimerEnabled()
-        ? () -> System.nanoTime()
-        : () -> 0L;
-
-    readMetrics = new TableReadMetrics(context, this, tableId);
-    if (this instanceof ReadWriteTable) {
-      writeMetrics = new TableWriteMetrics(context, this, tableId);
-    }
-  }
-
-  public String getTableId() {
-    return tableId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index d3ba771..5a3777e 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -54,7 +54,7 @@ public class TableManager {
 
   static class TableCtx {
     private TableProvider tableProvider;
-    private Table table;
+    private ReadWriteTable table;
   }
 
   private final Logger logger = LoggerFactory.getLogger(TableManager.class.getName());
@@ -110,7 +110,7 @@ public class TableManager {
    * @param tableId Id of the table
    * @return table instance
    */
-  public Table getTable(String tableId) {
+  public ReadWriteTable getTable(String tableId) {
     Preconditions.checkState(initialized, "TableManager has not been initialized.");
 
     TableCtx ctx = tableContexts.get(tableId);

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index e63bf61..2fde79a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -23,9 +23,8 @@ import com.google.common.base.Preconditions;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.BaseReadableTable;
+import org.apache.samza.table.BaseReadWriteTable;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 
 import java.util.ArrayList;
@@ -41,34 +40,32 @@ import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
 
 
 /**
- * A composite table incorporating a cache with a Samza table. The cache is
+ * A hybrid table incorporating a cache with a Samza table. The cache is
  * represented as a {@link ReadWriteTable}.
  *
- * The intented use case is to optimize the latency of accessing the actual table, eg.
+ * The intented use case is to optimize the latency of accessing the actual table, e.g.
  * remote tables, when eventual consistency between cache and table is acceptable.
  * The cache is expected to support TTL such that the values can be refreshed at some
  * point.
  *
- * If the actual table is read-write table, CachingTable supports both write-through
- * and write-around (writes bypassing cache) policies. For write-through policy, it
- * supports read-after-write semantics because the value is cached after written to
- * the table.
+ * {@link CachingTable} supports write-through and write-around (writes bypassing cache) policies.
+ * For write-through policy, it supports read-after-write semantics because the value is
+ * cached after written to the table.
  *
- * Note that there is no synchronization in CachingTable because it is impossible to
+ * Note that there is no synchronization in {@link CachingTable} because it is impossible to
  * implement a critical section between table read/write and cache update in the async
  * code paths without serializing all async operations for the same keys. Given stale
- * data is a presumed trade off for using a cache for table, it should be acceptable
- * for the data in table and cache are out-of-sync. Moreover, unsynchronized operations
- * in CachingTable also deliver higher performance when there is contention.
+ * data is a presumed trade-off for using a cache with table, it should be acceptable
+ * for the data in table and cache to be temporarily out-of-sync. Moreover, unsynchronized
+ * operations in {@link CachingTable} also deliver higher performance when there is contention.
  *
  * @param <K> type of the table key
  * @param <V> type of the table value
  */
-public class CachingTable<K, V> extends BaseReadableTable<K, V>
+public class CachingTable<K, V> extends BaseReadWriteTable<K, V>
     implements ReadWriteTable<K, V> {
 
-  private final ReadableTable<K, V> rdTable;
-  private final ReadWriteTable<K, V> rwTable;
+  private final ReadWriteTable<K, V> table;
   private final ReadWriteTable<K, V> cache;
   private final boolean isWriteAround;
 
@@ -76,10 +73,9 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   private AtomicLong hitCount = new AtomicLong();
   private AtomicLong missCount = new AtomicLong();
 
-  public CachingTable(String tableId, ReadableTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) {
+  public CachingTable(String tableId, ReadWriteTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) {
     super(tableId);
-    this.rdTable = table;
-    this.rwTable = table instanceof ReadWriteTable ? (ReadWriteTable) table : null;
+    this.table = table;
     this.cache = cache;
     this.isWriteAround = isWriteAround;
   }
@@ -114,16 +110,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   public V get(K key) {
     try {
       return getAsync(key).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<V> getAsync(K key) {
-    incCounter(readMetrics.numGets);
+    incCounter(metrics.numGets);
     V value = cache.get(key);
     if (value != null) {
       hitCount.incrementAndGet();
@@ -133,14 +127,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
     long startNs = clock.nanoTime();
     missCount.incrementAndGet();
 
-    return rdTable.getAsync(key).handle((result, e) -> {
+    return table.getAsync(key).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to get the record for " + key, e);
         } else {
           if (result != null) {
             cache.put(key, result);
           }
-          updateTimer(readMetrics.getNs, clock.nanoTime() - startNs);
+          updateTimer(metrics.getNs, clock.nanoTime() - startNs);
           return result;
         }
       });
@@ -150,16 +144,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   public Map<K, V> getAll(List<K> keys) {
     try {
       return getAllAsync(keys).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
-    incCounter(readMetrics.numGetAlls);
+    incCounter(metrics.numGetAlls);
     // Make a copy of entries which might be immutable
     Map<K, V> getAllResult = new HashMap<>();
     List<K> missingKeys = lookupCache(keys, getAllResult);
@@ -169,7 +161,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
     }
 
     long startNs = clock.nanoTime();
-    return rdTable.getAllAsync(missingKeys).handle((records, e) -> {
+    return table.getAllAsync(missingKeys).handle((records, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to get records for " + keys, e);
         } else {
@@ -179,7 +171,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
                 .collect(Collectors.toList()));
             getAllResult.putAll(records);
           }
-          updateTimer(readMetrics.getAllNs, clock.nanoTime() - startNs);
+          updateTimer(metrics.getAllNs, clock.nanoTime() - startNs);
           return getAllResult;
         }
       });
@@ -189,20 +181,18 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   public void put(K key, V value) {
     try {
       putAsync(key, value).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Void> putAsync(K key, V value) {
-    incCounter(writeMetrics.numPuts);
-    Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
+    incCounter(metrics.numPuts);
+    Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + table);
 
     long startNs = clock.nanoTime();
-    return rwTable.putAsync(key, value).handle((result, e) -> {
+    return table.putAsync(key, value).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException(String.format("Failed to put a record, key=%s, value=%s", key, value), e);
         } else if (!isWriteAround) {
@@ -212,7 +202,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
             cache.put(key, value);
           }
         }
-        updateTimer(writeMetrics.putNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.putNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -221,26 +211,24 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   public void putAll(List<Entry<K, V>> records) {
     try {
       putAllAsync(records).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
-    incCounter(writeMetrics.numPutAlls);
+    incCounter(metrics.numPutAlls);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
-    return rwTable.putAllAsync(records).handle((result, e) -> {
+    Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + table);
+    return table.putAllAsync(records).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to put records " + records, e);
         } else if (!isWriteAround) {
           cache.putAll(records);
         }
 
-        updateTimer(writeMetrics.putAllNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.putAllNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -249,25 +237,23 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   public void delete(K key) {
     try {
       deleteAsync(key).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Void> deleteAsync(K key) {
-    incCounter(writeMetrics.numDeletes);
+    incCounter(metrics.numDeletes);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
-    return rwTable.deleteAsync(key).handle((result, e) -> {
+    Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " + table);
+    return table.deleteAsync(key).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to delete the record for " + key, e);
         } else if (!isWriteAround) {
           cache.delete(key);
         }
-        updateTimer(writeMetrics.deleteNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.deleteNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -283,33 +269,33 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
 
   @Override
   public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
-    incCounter(writeMetrics.numDeleteAlls);
+    incCounter(metrics.numDeleteAlls);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
-    return rwTable.deleteAllAsync(keys).handle((result, e) -> {
+    Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " + table);
+    return table.deleteAllAsync(keys).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to delete the record for " + keys, e);
         } else if (!isWriteAround) {
           cache.deleteAll(keys);
         }
-        updateTimer(writeMetrics.deleteAllNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.deleteAllNs, clock.nanoTime() - startNs);
         return result;
       });
   }
 
   @Override
   public synchronized void flush() {
-    incCounter(writeMetrics.numFlushes);
+    incCounter(metrics.numFlushes);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable);
-    rwTable.flush();
-    updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
+    Preconditions.checkNotNull(table, "Cannot flush a read-only table: " + table);
+    table.flush();
+    updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
   }
 
   @Override
   public void close() {
-    this.cache.close();
-    this.rdTable.close();
+    cache.close();
+    table.close();
   }
 
   double hitRate() {

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
index d835809..e533cf4 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
@@ -25,8 +25,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.BaseTableProvider;
@@ -47,18 +45,18 @@ public class CachingTableProvider extends BaseTableProvider {
   }
 
   @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
     Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
 
     JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
     String realTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.REAL_TABLE_ID);
-    ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
+    ReadWriteTable table = this.context.getTaskContext().getTable(realTableId);
 
     String cacheTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.CACHE_TABLE_ID);
     ReadWriteTable cache;
 
     if (cacheTableId != null) {
-      cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId);
+      cache = this.context.getTaskContext().getTable(cacheTableId);
     } else {
       cache = createDefaultCacheTable(realTableId, tableConfig);
       defaultCaches.add(cache);

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
index b75a0bc..d8a5d9c 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
@@ -23,7 +23,7 @@ import com.google.common.cache.Cache;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.BaseReadableTable;
+import org.apache.samza.table.BaseReadWriteTable;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 
@@ -40,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
  * @param <K> type of the key in the cache
  * @param <V> type of the value in the cache
  */
-public class GuavaCacheTable<K, V> extends BaseReadableTable<K, V>
+public class GuavaCacheTable<K, V> extends BaseReadWriteTable<K, V>
     implements ReadWriteTable<K, V> {
 
   private final Cache<K, V> cache;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
index e45719e..042d3c7 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.BaseTableProvider;
 import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
 import org.apache.samza.table.utils.SerdeUtils;
@@ -44,7 +44,7 @@ public class GuavaCacheTableProvider extends BaseTableProvider {
   }
 
   @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
     Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
     JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
     Cache guavaCache = SerdeUtils.deserialize(GuavaCacheTableDescriptor.GUAVA_CACHE,

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
deleted file mode 100644
index 80c2cac..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.utils.TableMetricsUtil;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-
-/**
- * Remote store backed read writable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V>
-    implements ReadWriteTable<K, V> {
-
-  protected final TableWriteFunction<K, V> writeFn;
-  protected final TableRateLimiter writeRateLimiter;
-
-  public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
-      TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> writeRateLimiter,
-      ExecutorService tableExecutor, ExecutorService callbackExecutor) {
-    super(tableId, readFn, readRateLimiter, tableExecutor, callbackExecutor);
-    Preconditions.checkNotNull(writeFn, "null write function");
-    this.writeFn = writeFn;
-    this.writeRateLimiter = writeRateLimiter;
-  }
-
-  @Override
-  public void init(Context context) {
-    super.init(context);
-    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
-    if (metricsConfig.getMetricsTimerEnabled()) {
-      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
-      writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
-    }
-  }
-
-  @Override
-  public void put(K key, V value) {
-    try {
-      putAsync(key, value).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> putAsync(K key, V value) {
-    Preconditions.checkNotNull(key);
-    if (value == null) {
-      return deleteAsync(key);
-    }
-
-    return execute(writeRateLimiter, key, value, writeFn::putAsync, writeMetrics.numPuts, writeMetrics.putNs)
-        .exceptionally(e -> {
-            throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e);
-          });
-  }
-
-  @Override
-  public void putAll(List<Entry<K, V>> entries) {
-    try {
-      putAllAsync(entries).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
-    Preconditions.checkNotNull(records);
-    if (records.isEmpty()) {
-      return CompletableFuture.completedFuture(null);
-    }
-
-    List<K> deleteKeys = records.stream()
-        .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList());
-
-    CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
-        ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys);
-
-    List<Entry<K, V>> putRecords = records.stream()
-        .filter(e -> e.getValue() != null).collect(Collectors.toList());
-
-    // Return the combined future
-    return CompletableFuture.allOf(
-        deleteFuture,
-        executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, writeMetrics.numPutAlls, writeMetrics.putAllNs))
-        .exceptionally(e -> {
-            String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(","));
-            throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
-          });
-  }
-
-  @Override
-  public void delete(K key) {
-    try {
-      deleteAsync(key).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> deleteAsync(K key) {
-    Preconditions.checkNotNull(key);
-    return execute(writeRateLimiter, key, writeFn::deleteAsync, writeMetrics.numDeletes, writeMetrics.deleteNs)
-        .exceptionally(e -> {
-            throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
-          });
-  }
-
-  @Override
-  public void deleteAll(List<K> keys) {
-    try {
-      deleteAllAsync(keys).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
-    Preconditions.checkNotNull(keys);
-    if (keys.isEmpty()) {
-      return CompletableFuture.completedFuture(null);
-    }
-
-    return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs)
-        .exceptionally(e -> {
-            throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e);
-          });
-  }
-
-  @Override
-  public void flush() {
-    try {
-      incCounter(writeMetrics.numFlushes);
-      long startNs = clock.nanoTime();
-      writeFn.flush();
-      updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
-    } catch (Exception e) {
-      String errMsg = "Failed to flush remote store";
-      logger.error(errMsg, e);
-      throw new SamzaException(errMsg, e);
-    }
-  }
-
-  @Override
-  public void close() {
-    writeFn.close();
-    super.close();
-  }
-
-  /**
-   * Execute an async request given a table record (key+value)
-   * @param rateLimiter helper for rate limiting
-   * @param key key of the table record
-   * @param value value of the table record
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @return CompletableFuture of the operation
-   */
-  protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
-      K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
-            .thenCompose((r) -> method.apply(key, value))
-        : method.apply(key, value);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  /**
-   * Execute an async request given a collection of table records
-   * @param rateLimiter helper for rate limiting
-   * @param records list of records
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @return CompletableFuture of the operation
-   */
-  protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
-      Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method,
-      Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
-            .thenCompose((r) -> method.apply(records))
-        : method.apply(records);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  @VisibleForTesting
-  public TableWriteFunction<K, V> getWriteFn() {
-    return writeFn;
-  }
-
-  @VisibleForTesting
-  public TableRateLimiter getWriteRateLimiter() {
-    return writeRateLimiter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
deleted file mode 100644
index 84a05b8..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Objects;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.BaseReadableTable;
-import org.apache.samza.table.utils.TableMetricsUtil;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Function;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-/**
- * A Samza {@link org.apache.samza.table.Table} backed by a remote data-store or service.
- * <p>
- * Many stream-processing applications require to look-up data from remote data sources eg: databases,
- * web-services, RPC systems to process messages in the stream. Such access to adjunct datasets can be
- * naturally modeled as a join between the incoming stream and a {@link RemoteReadableTable}.
- * <p>
- * Example use-cases include:
- * <ul>
- *  <li> Augmenting a stream of "page-views" with information from a database of user-profiles; </li>
- *  <li> Scoring page views with impressions services. </li>
- *  <li> A notifications-system that sends out emails may require a query to an external database to process its message. </li>
- * </ul>
- * <p>
- * A {@link RemoteReadableTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction}
- * which encapsulate the functionality of reading and writing data to the remote service. These provide a
- * pluggable means to specify I/O operations on the table. While the base implementation merely delegates to
- * these reader and writer functions, sub-classes of {@link RemoteReadableTable} may provide rich functionality like
- * caching or throttling on top of them.
- *
- * For async IO methods, requests are dispatched by a single-threaded executor after invoking the rateLimiter.
- * Optionally, an executor can be specified for invoking the future callbacks which otherwise are
- * executed on the threads of the underlying native data store client. This could be useful when
- * application might execute long-running operations upon future completions; another use case is to increase
- * throughput with more parallelism in the callback executions.
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
-
-  protected final ExecutorService callbackExecutor;
-  protected final ExecutorService tableExecutor;
-  protected final TableReadFunction<K, V> readFn;
-  protected final TableRateLimiter<K, V> readRateLimiter;
-
-  /**
-   * Construct a RemoteReadableTable instance
-   * @param tableId table id
-   * @param readFn {@link TableReadFunction} for read operations
-   * @param rateLimiter helper for rate limiting
-   * @param tableExecutor executor for issuing async requests
-   * @param callbackExecutor executor for invoking async callbacks
-   */
-  public RemoteReadableTable(String tableId, TableReadFunction<K, V> readFn,
-      TableRateLimiter<K, V> rateLimiter, ExecutorService tableExecutor, ExecutorService callbackExecutor) {
-    super(tableId);
-    Preconditions.checkNotNull(readFn, "null read function");
-    this.readFn = readFn;
-    this.readRateLimiter = rateLimiter;
-    this.callbackExecutor = callbackExecutor;
-    this.tableExecutor = tableExecutor;
-  }
-
-  @Override
-  public void init(Context context) {
-    super.init(context);
-    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
-    if (metricsConfig.getMetricsTimerEnabled()) {
-      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
-      readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
-    }
-  }
-
-  @Override
-  public V get(K key) {
-    try {
-      return getAsync(key).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<V> getAsync(K key) {
-    Preconditions.checkNotNull(key);
-    return execute(readRateLimiter, key, readFn::getAsync, readMetrics.numGets, readMetrics.getNs)
-        .handle((result, e) -> {
-            if (e != null) {
-              throw new SamzaException("Failed to get the records for " + key, e);
-            }
-            if (result == null) {
-              incCounter(readMetrics.numMissedLookups);
-            }
-            return result;
-          });
-  }
-
-  @Override
-  public Map<K, V> getAll(List<K> keys) {
-    try {
-      return getAllAsync(keys).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
-    Preconditions.checkNotNull(keys);
-    if (keys.isEmpty()) {
-      return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
-    }
-    return execute(readRateLimiter, keys, readFn::getAllAsync, readMetrics.numGetAlls, readMetrics.getAllNs)
-        .handle((result, e) -> {
-            if (e != null) {
-              throw new SamzaException("Failed to get the records for " + keys, e);
-            }
-            result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
-            return result;
-          });
-  }
-
-  /**
-   * Execute an async request given a table key
-   * @param rateLimiter helper for rate limiting
-   * @param key key of the table record
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @param <T> return type
-   * @return CompletableFuture of the operation
-   */
-  protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
-      K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
-            .thenCompose((r) -> method.apply(key))
-        : method.apply(key);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  /**
-   * Execute an async request given a collection of table keys
-   * @param rateLimiter helper for rate limiting
-   * @param keys collection of keys
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @param <T> return type
-   * @return CompletableFuture of the operation
-   */
-  protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
-      Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
-            .thenCompose((r) -> method.apply(keys))
-        : method.apply(keys);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  /**
-   * Complete the pending execution and update timer
-   * @param ioFuture the future to be executed
-   * @param startNs start time in nanosecond
-   * @param timer latency metric to be updated
-   * @param <T> return type
-   * @return CompletableFuture of the operation
-   */
-  protected  <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) {
-    if (callbackExecutor != null) {
-      ioFuture.thenApplyAsync(r -> {
-          updateTimer(timer, clock.nanoTime() - startNs);
-          return r;
-        }, callbackExecutor);
-    } else {
-      ioFuture.thenApply(r -> {
-          updateTimer(timer, clock.nanoTime() - startNs);
-          return r;
-        });
-    }
-    return ioFuture;
-  }
-
-  @Override
-  public void close() {
-    readFn.close();
-  }
-
-  @VisibleForTesting
-  public ExecutorService getCallbackExecutor() {
-    return callbackExecutor;
-  }
-
-  @VisibleForTesting
-  public ExecutorService getTableExecutor() {
-    return tableExecutor;
-  }
-
-  @VisibleForTesting
-  public TableReadFunction<K, V> getReadFn() {
-    return readFn;
-  }
-
-  @VisibleForTesting
-  public TableRateLimiter<K, V> getReadRateLimiter() {
-    return readRateLimiter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
new file mode 100644
index 0000000..5b9b289
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.BaseReadWriteTable;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
+
+/**
+ * A Samza {@link ReadWriteTable} backed by a remote data-store or service.
+ * <p>
+ * Many stream-processing applications require to look-up data from remote data sources eg: databases,
+ * web-services, RPC systems to process messages in the stream. Such access to adjunct datasets can be
+ * naturally modeled as a join between the incoming stream and a table.
+ * <p>
+ * Example use-cases include:
+ * <ul>
+ *  <li> Augmenting a stream of "page-views" with information from a database of user-profiles; </li>
+ *  <li> Scoring page views with impressions services. </li>
+ *  <li> A notifications-system that sends out emails may require a query to an external database to process its message. </li>
+ * </ul>
+ * <p>
+ * A {@link RemoteTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction}
+ * which encapsulate the functionality of reading and writing data to the remote service. These provide a
+ * pluggable means to specify I/O operations on the table.
+ *
+ * For async IO methods, requests are dispatched by a single-threaded executor after invoking the rateLimiter.
+ * Optionally, an executor can be specified for invoking the future callbacks which otherwise are
+ * executed on the threads of the underlying native data store client. This could be useful when
+ * application might execute long-running operations upon future completions; another use case is to increase
+ * throughput with more parallelism in the callback executions.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
+    implements ReadWriteTable<K, V> {
+
+  protected final ExecutorService callbackExecutor;
+  protected final ExecutorService tableExecutor;
+  protected final TableReadFunction<K, V> readFn;
+  protected final TableWriteFunction<K, V> writeFn;
+  protected final TableRateLimiter<K, V> readRateLimiter;
+  protected final TableRateLimiter writeRateLimiter;
+
+  /**
+   * Construct a RemoteTable instance
+   * @param tableId table id
+   * @param readFn {@link TableReadFunction} for read operations
+   * @param writeFn {@link TableWriteFunction} for read operations
+   * @param readRateLimiter helper for read rate limiting
+   * @param writeRateLimiter helper for write rate limiting
+   * @param tableExecutor executor for issuing async requests
+   * @param callbackExecutor executor for invoking async callbacks
+   */
+  public RemoteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
+      TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> writeRateLimiter,
+      ExecutorService tableExecutor, ExecutorService callbackExecutor) {
+    super(tableId);
+    Preconditions.checkNotNull(readFn, "null read function");
+    this.readFn = readFn;
+    this.writeFn = writeFn;
+    this.readRateLimiter = readRateLimiter;
+    this.writeRateLimiter = writeRateLimiter;
+    this.tableExecutor = tableExecutor;
+    this.callbackExecutor = callbackExecutor;
+  }
+
+  @Override
+  public void init(Context context) {
+    super.init(context);
+    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+    if (metricsConfig.getMetricsTimerEnabled()) {
+      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
+      readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
+      if (writeRateLimiter != null) {
+        writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
+      }
+    }
+  }
+
+  @Override
+  public V get(K key) {
+    try {
+      return getAsync(key).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key) {
+    Preconditions.checkNotNull(key);
+    return execute(readRateLimiter, key, readFn::getAsync, metrics.numGets, metrics.getNs)
+        .handle((result, e) -> {
+            if (e != null) {
+              throw new SamzaException("Failed to get the records for " + key, e);
+            }
+            if (result == null) {
+              incCounter(metrics.numMissedLookups);
+            }
+            return result;
+          });
+  }
+
+  @Override
+  public Map<K, V> getAll(List<K> keys) {
+    try {
+      return getAllAsync(keys).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+    Preconditions.checkNotNull(keys);
+    if (keys.isEmpty()) {
+      return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
+    }
+    return execute(readRateLimiter, keys, readFn::getAllAsync, metrics.numGetAlls, metrics.getAllNs)
+        .handle((result, e) -> {
+            if (e != null) {
+              throw new SamzaException("Failed to get the records for " + keys, e);
+            }
+            result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups));
+            return result;
+          });
+  }
+
+  @Override
+  public void put(K key, V value) {
+    try {
+      putAsync(key, value).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V value) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(key);
+    if (value == null) {
+      return deleteAsync(key);
+    }
+
+    return execute(writeRateLimiter, key, value, writeFn::putAsync, metrics.numPuts, metrics.putNs)
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e);
+          });
+  }
+
+  @Override
+  public void putAll(List<Entry<K, V>> entries) {
+    try {
+      putAllAsync(entries).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(records);
+    if (records.isEmpty()) {
+      return CompletableFuture.completedFuture(null);
+    }
+
+    List<K> deleteKeys = records.stream()
+        .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList());
+
+    CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
+        ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys);
+
+    List<Entry<K, V>> putRecords = records.stream()
+        .filter(e -> e.getValue() != null).collect(Collectors.toList());
+
+    // Return the combined future
+    return CompletableFuture.allOf(
+        deleteFuture,
+        executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, metrics.numPutAlls, metrics.putAllNs))
+        .exceptionally(e -> {
+            String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(","));
+            throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
+          });
+  }
+
+  @Override
+  public void delete(K key) {
+    try {
+      deleteAsync(key).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(key);
+    return execute(writeRateLimiter, key, writeFn::deleteAsync, metrics.numDeletes, metrics.deleteNs)
+        .exceptionally(e -> {
+            throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
+          });
+  }
+
+  @Override
+  public void deleteAll(List<K> keys) {
+    try {
+      deleteAllAsync(keys).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(keys);
+    if (keys.isEmpty()) {
+      return CompletableFuture.completedFuture(null);
+    }
+
+    return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, metrics.numDeleteAlls, metrics.deleteAllNs)
+        .exceptionally(e -> {
+            throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e);
+          });
+  }
+
+  @Override
+  public void flush() {
+    if (writeFn != null) {
+      try {
+        incCounter(metrics.numFlushes);
+        long startNs = clock.nanoTime();
+        writeFn.flush();
+        updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
+      } catch (Exception e) {
+        String errMsg = "Failed to flush remote store";
+        logger.error(errMsg, e);
+        throw new SamzaException(errMsg, e);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    readFn.close();
+    if (writeFn != null) {
+      writeFn.close();
+    }
+  }
+
+  /**
+   * Execute an async request given a table key
+   * @param rateLimiter helper for rate limiting
+   * @param key key of the table record
+   * @param method method to be executed
+   * @param counter count metric to be updated
+   * @param timer latency metric to be updated
+   * @param <T> return type
+   * @return CompletableFuture of the operation
+   */
+  protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
+      K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = clock.nanoTime();
+    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+        .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
+        .thenCompose((r) -> method.apply(key))
+        : method.apply(key);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
+  /**
+   * Execute an async request given a collection of table keys
+   * @param rateLimiter helper for rate limiting
+   * @param keys collection of keys
+   * @param method method to be executed
+   * @param counter count metric to be updated
+   * @param timer latency metric to be updated
+   * @param <T> return type
+   * @return CompletableFuture of the operation
+   */
+  protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
+      Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = clock.nanoTime();
+    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+        .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
+        .thenCompose((r) -> method.apply(keys))
+        : method.apply(keys);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
+  /**
+   * Complete the pending execution and update timer
+   * @param ioFuture the future to be executed
+   * @param startNs start time in nanosecond
+   * @param timer latency metric to be updated
+   * @param <T> return type
+   * @return CompletableFuture of the operation
+   */
+  protected  <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) {
+    if (callbackExecutor != null) {
+      ioFuture.thenApplyAsync(r -> {
+          updateTimer(timer, clock.nanoTime() - startNs);
+          return r;
+        }, callbackExecutor);
+    } else {
+      ioFuture.thenApply(r -> {
+          updateTimer(timer, clock.nanoTime() - startNs);
+          return r;
+        });
+    }
+    return ioFuture;
+  }
+
+  /**
+   * Execute an async request given a table record (key+value)
+   * @param rateLimiter helper for rate limiting
+   * @param key key of the table record
+   * @param value value of the table record
+   * @param method method to be executed
+   * @param counter count metric to be updated
+   * @param timer latency metric to be updated
+   * @return CompletableFuture of the operation
+   */
+  protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
+      K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = clock.nanoTime();
+    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+            .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
+            .thenCompose((r) -> method.apply(key, value))
+        : method.apply(key, value);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
+  /**
+   * Execute an async request given a collection of table records
+   * @param rateLimiter helper for rate limiting
+   * @param records list of records
+   * @param method method to be executed
+   * @param counter count metric to be updated
+   * @param timer latency metric to be updated
+   * @return CompletableFuture of the operation
+   */
+  protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
+      Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method,
+      Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = clock.nanoTime();
+    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+            .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
+            .thenCompose((r) -> method.apply(records))
+        : method.apply(records);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
+  @VisibleForTesting
+  public ExecutorService getCallbackExecutor() {
+    return callbackExecutor;
+  }
+
+  @VisibleForTesting
+  public ExecutorService getTableExecutor() {
+    return tableExecutor;
+  }
+
+  @VisibleForTesting
+  public TableReadFunction<K, V> getReadFn() {
+    return readFn;
+  }
+
+  @VisibleForTesting
+  public TableRateLimiter<K, V> getReadRateLimiter() {
+    return readRateLimiter;
+  }
+
+  @VisibleForTesting
+  public TableWriteFunction<K, V> getWriteFn() {
+    return writeFn;
+  }
+
+  @VisibleForTesting
+  public TableRateLimiter getWriteRateLimiter() {
+    return writeRateLimiter;
+  }
+}


[3/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and ReadWriteTable

Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 93a0521..8b6bc1a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -21,8 +21,7 @@ package org.apache.samza.table.remote;
 
 import com.google.common.base.Preconditions;
 import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.retry.RetriableReadFunction;
 import org.apache.samza.table.retry.RetriableWriteFunction;
@@ -45,9 +44,7 @@ import java.util.concurrent.ScheduledExecutorService;
  */
 public class RemoteTableProvider extends BaseTableProvider {
 
-  private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
-
-  private boolean readOnly;
+  private final List<RemoteTable<?, ?>> tables = new ArrayList<>();
 
   /**
    * Map of tableId -> executor service for async table IO and callbacks. The same executors
@@ -63,21 +60,13 @@ public class RemoteTableProvider extends BaseTableProvider {
   }
 
   @Override
-  public void init(Context context) {
-    super.init(context);
-    JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
-    this.readOnly = tableConfig.getForTable(tableId, RemoteTableDescriptor.WRITE_FN) == null;
-  }
-
-  @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
 
     Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
 
-    RemoteReadableTable table;
-
     JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
 
+    // Read part
     TableReadFunction readFn = getReadFn(tableConfig);
     RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER);
     if (rateLimiter != null) {
@@ -86,34 +75,29 @@ public class RemoteTableProvider extends BaseTableProvider {
     TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
     TableRateLimiter readRateLimiter = new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
 
-    TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
-    TableRateLimiter writeRateLimiter = null;
-
     TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY);
-    TableRetryPolicy writeRetryPolicy = null;
-
-    if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
-      retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
-          Thread thread = new Thread(runnable);
-          thread.setName("table-retry-executor");
-          thread.setDaemon(true);
-          return thread;
-        });
-    }
-
     if (readRetryPolicy != null) {
+      if (retryExecutor == null) {
+        retryExecutor = createRetryExecutor();
+      }
       readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
     }
 
-    TableWriteFunction writeFn = getWriteFn(tableConfig);
-
     boolean isRateLimited = readRateLimiter.isRateLimited();
-    if (!readOnly) {
-      writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
+
+    // Write part
+    TableWriteFunction writeFn = getWriteFn(tableConfig);
+    TableRateLimiter writeRateLimiter = null;
+    TableRetryPolicy writeRetryPolicy = null;
+    if (writeFn != null) {
+      TableRateLimiter.CreditFunction<?, ?> writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
       writeRateLimiter = new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
       isRateLimited |= writeRateLimiter.isRateLimited();
       writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY);
       if (writeRetryPolicy != null) {
+        if (retryExecutor == null) {
+          retryExecutor = createRetryExecutor();
+        }
         writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
       }
     }
@@ -140,13 +124,8 @@ public class RemoteTableProvider extends BaseTableProvider {
             }));
     }
 
-    if (readOnly) {
-      table = new RemoteReadableTable(tableId, readFn, readRateLimiter,
-          tableExecutors.get(tableId), callbackExecutors.get(tableId));
-    } else {
-      table = new RemoteReadWriteTable(tableId, readFn, writeFn, readRateLimiter,
-          writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
-    }
+    RemoteTable table = new RemoteTable(tableId, readFn, writeFn, readRateLimiter,
+        writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
 
     TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
     if (readRetryPolicy != null) {
@@ -192,5 +171,14 @@ public class RemoteTableProvider extends BaseTableProvider {
     }
     return writeFn;
   }
+
+  private ScheduledExecutorService createRetryExecutor() {
+    return Executors.newSingleThreadScheduledExecutor(runnable -> {
+        Thread thread = new Thread(runnable);
+        thread.setName("table-retry-executor");
+        thread.setDaemon(true);
+        return thread;
+      });
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
new file mode 100644
index 0000000..df6833e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils;
+
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Utility class that contains the default set of read metrics.
+ */
+public class TableMetrics {
+
+  // Read metrics
+  public final Timer getNs;
+  public final Timer getAllNs;
+  public final Counter numGets;
+  public final Counter numGetAlls;
+  public final Counter numMissedLookups;
+  // Write metrics
+  public final Counter numPuts;
+  public final Timer putNs;
+  public final Counter numPutAlls;
+  public final Timer putAllNs;
+  public final Counter numDeletes;
+  public final Timer deleteNs;
+  public final Counter numDeleteAlls;
+  public final Timer deleteAllNs;
+  public final Counter numFlushes;
+  public final Timer flushNs;
+
+  /**
+   * Constructor based on container and task container context
+   *
+   * @param context {@link Context} for this task
+   * @param table underlying table
+   * @param tableId table Id
+   */
+  public TableMetrics(Context context, Table table, String tableId) {
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
+    // Read metrics
+    numGets = tableMetricsUtil.newCounter("num-gets");
+    getNs = tableMetricsUtil.newTimer("get-ns");
+    numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
+    getAllNs = tableMetricsUtil.newTimer("getAll-ns");
+    numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
+    // Write metrics
+    numPuts = tableMetricsUtil.newCounter("num-puts");
+    putNs = tableMetricsUtil.newTimer("put-ns");
+    numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
+    putAllNs = tableMetricsUtil.newTimer("putAll-ns");
+    numDeletes = tableMetricsUtil.newCounter("num-deletes");
+    deleteNs = tableMetricsUtil.newTimer("delete-ns");
+    numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
+    deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
+    numFlushes = tableMetricsUtil.newCounter("num-flushes");
+    flushNs = tableMetricsUtil.newTimer("flush-ns");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
deleted file mode 100644
index e77fcfd..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-/**
- * Utility class that contains the default set of read metrics.
- */
-public class TableReadMetrics {
-
-  public final Timer getNs;
-  public final Timer getAllNs;
-  public final Counter numGets;
-  public final Counter numGetAlls;
-  public final Counter numMissedLookups;
-
-  /**
-   * Constructor based on container and task container context
-   *
-   * @param context {@link Context} for this task
-   * @param table underlying table
-   * @param tableId table Id
-   */
-  public TableReadMetrics(Context context, Table table, String tableId) {
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
-    numGets = tableMetricsUtil.newCounter("num-gets");
-    getNs = tableMetricsUtil.newTimer("get-ns");
-    numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
-    getAllNs = tableMetricsUtil.newTimer("getAll-ns");
-    numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
deleted file mode 100644
index bf65b74..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-public class TableWriteMetrics {
-
-  public final Counter numPuts;
-  public final Timer putNs;
-  public final Counter numPutAlls;
-  public final Timer putAllNs;
-  public final Counter numDeletes;
-  public final Timer deleteNs;
-  public final Counter numDeleteAlls;
-  public final Timer deleteAllNs;
-  public final Counter numFlushes;
-  public final Timer flushNs;
-
-  /**
-   * Utility class that contains the default set of write metrics.
-   *
-   * @param context {@link Context} for this task
-   * @param table underlying table
-   * @param tableId table Id
-   */
-  public TableWriteMetrics(Context context, Table table, String tableId) {
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
-    numPuts = tableMetricsUtil.newCounter("num-puts");
-    putNs = tableMetricsUtil.newTimer("put-ns");
-    numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
-    putAllNs = tableMetricsUtil.newTimer("putAll-ns");
-    numDeletes = tableMetricsUtil.newCounter("num-deletes");
-    deleteNs = tableMetricsUtil.newTimer("delete-ns");
-    numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
-    deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
-    numFlushes = tableMetricsUtil.newCounter("num-flushes");
-    flushNs = tableMetricsUtil.newTimer("flush-ns");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
index 4112c8b..8fd161b 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -26,7 +26,7 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
@@ -71,7 +71,7 @@ public class TestStreamTableJoinOperatorImpl {
             return record.getKey();
           }
         });
-    ReadableTable table = mock(ReadableTable.class);
+    ReadWriteTable table = mock(ReadWriteTable.class);
     when(table.get("1")).thenReturn("r1");
     when(table.get("2")).thenReturn(null);
     Context context = new MockContext();

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index a3b1963..e60b6ff 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -49,12 +49,12 @@ public class TestTableManager {
 
   public static class DummyTableProviderFactory implements TableProviderFactory {
 
-    static ReadableTable table;
+    static ReadWriteTable table;
     static TableProvider tableProvider;
 
     @Override
     public TableProvider getTableProvider(String tableId) {
-      table = mock(ReadableTable.class);
+      table = mock(ReadWriteTable.class);
       tableProvider = mock(TableProvider.class);
       when(tableProvider.getTable()).thenReturn(table);
       return tableProvider;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index 5a19767..c304bfd 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -35,11 +35,10 @@ import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteTable;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
@@ -138,11 +137,11 @@ public class TestCachingTable {
     return Pair.of(cacheTable, cacheStore);
   }
 
-  private void initTables(ReadableTable ... tables) {
+  private void initTables(ReadWriteTable ... tables) {
     initTables(false, tables);
   }
 
-  private void initTables(boolean isTimerMetricsDisabled, ReadableTable ... tables) {
+  private void initTables(boolean isTimerMetricsDisabled, ReadWriteTable ... tables) {
     Map<String, String> config = new HashMap<>();
     if (isTimerMetricsDisabled) {
       config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
@@ -242,7 +241,7 @@ public class TestCachingTable {
 
   @Test
   public void testNonexistentKeyInTable() {
-    ReadableTable<String, String> table = mock(ReadableTable.class);
+    ReadWriteTable<String, String> table = mock(ReadWriteTable.class);
     doReturn(CompletableFuture.completedFuture(null)).when(table).getAsync(any());
     ReadWriteTable<String, String> cache = getMockCache().getLeft();
     CachingTable<String, String> cachingTable = new CachingTable<>("myTable", table, cache, false);
@@ -255,7 +254,7 @@ public class TestCachingTable {
 
   @Test
   public void testKeyEviction() {
-    ReadableTable<String, String> table = mock(ReadableTable.class);
+    ReadWriteTable<String, String> table = mock(ReadWriteTable.class);
     doReturn(CompletableFuture.completedFuture("3")).when(table).getAsync(any());
     ReadWriteTable<String, String> cache = mock(ReadWriteTable.class);
 
@@ -283,7 +282,7 @@ public class TestCachingTable {
     TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class);
     TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+    final RemoteTable<String, String> remoteTable = new RemoteTable<>(
         tableId + "-remote", readFn, writeFn, rateLimitHelper, rateLimitHelper,
         Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
 
@@ -402,7 +401,7 @@ public class TestCachingTable {
     doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
     doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
 
-    final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+    final RemoteTable<String, String> remoteTable = new RemoteTable<>(
         tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper,
         Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
index 486295a..a764a8b 100644
--- a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
@@ -30,7 +30,7 @@ import org.apache.samza.context.Context;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
 import org.junit.Test;
@@ -143,7 +143,7 @@ public class TestLocalTableDescriptor {
     }
 
     @Override
-    public Table getTable() {
+    public ReadWriteTable getTable() {
       throw new SamzaException("Not implemented");
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
deleted file mode 100644
index d7733a8..0000000
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import junit.framework.Assert;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.retry.RetriableReadFunction;
-import org.apache.samza.table.retry.RetriableWriteFunction;
-import org.apache.samza.table.retry.TableRetryPolicy;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-
-public class TestRemoteReadWriteTable {
-  private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
-
-  public static Context getMockContext() {
-    Context context = new MockContext();
-    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
-    doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
-    doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
-    doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
-    return context;
-  }
-
-  private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
-      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
-    return getTable(tableId, readFn, writeFn, null);
-  }
-
-  private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
-      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
-    RemoteReadableTable<K, V> table;
-
-    TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
-    TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
-    doReturn(true).when(readRateLimiter).isRateLimited();
-    doReturn(true).when(writeRateLimiter).isRateLimited();
-
-    ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
-
-    if (writeFn == null) {
-      table = new RemoteReadableTable<K, V>(tableId, readFn, readRateLimiter, tableExecutor, cbExecutor);
-    } else {
-      table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
-    }
-
-    Context context = getMockContext();
-
-    table.init(context);
-
-    return (T) table;
-  }
-
-  private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
-    String tableId = "testGet-" + sync + error + retry;
-    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
-    // Sync is backed by async so needs to mock the async method
-    CompletableFuture<String> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-      if (!retry) {
-        doReturn(future).when(readFn).getAsync(anyString());
-      } else {
-        final int [] times = new int[] {0};
-        doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
-            .when(readFn).getAsync(anyString());
-      }
-    } else {
-      future = CompletableFuture.completedFuture("bar");
-      doReturn(future).when(readFn).getAsync(anyString());
-    }
-    if (retry) {
-      doReturn(true).when(readFn).isRetriable(any());
-      TableRetryPolicy policy = new TableRetryPolicy();
-      readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
-    }
-    RemoteReadableTable<String, String> table = getTable(tableId, readFn, null);
-    Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
-    verify(table.readRateLimiter, times(1)).throttle(anyString());
-  }
-
-  @Test
-  public void testGet() throws Exception {
-    doTestGet(true, false, false);
-  }
-
-  @Test
-  public void testGetAsync() throws Exception {
-    doTestGet(false, false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testGetAsyncError() throws Exception {
-    doTestGet(false, true, false);
-  }
-
-  @Test
-  public void testGetAsyncErrorRetried() throws Exception {
-    doTestGet(false, true, true);
-  }
-
-  @Test
-  public void testGetMultipleTables() {
-    TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
-    TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
-
-    // Sync is backed by async so needs to mock the async method
-    doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
-    doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
-
-    RemoteReadableTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
-    RemoteReadableTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
-
-    CompletableFuture<String> future1 = table1.getAsync("foo1");
-    CompletableFuture<String> future2 = table2.getAsync("foo2");
-
-    CompletableFuture.allOf(future1, future2)
-        .thenAccept(u -> {
-            Assert.assertEquals(future1.join(), "bar1");
-            Assert.assertEquals(future2.join(), "bar1");
-          });
-  }
-
-  private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
-    String tableId = "testPut-" + sync + error + isDelete + retry;
-    TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
-    TableWriteFunction<String, String> writeFn = mockWriteFn;
-    CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
-    CompletableFuture<Void> failureFuture = new CompletableFuture();
-    failureFuture.completeExceptionally(new RuntimeException("Test exception"));
-    if (!error) {
-      if (isDelete) {
-        doReturn(successFuture).when(writeFn).deleteAsync(any());
-      } else {
-        doReturn(successFuture).when(writeFn).putAsync(any(), any());
-      }
-    } else if (!retry) {
-      if (isDelete) {
-        doReturn(failureFuture).when(writeFn).deleteAsync(any());
-      } else {
-        doReturn(failureFuture).when(writeFn).putAsync(any(), any());
-      }
-    } else {
-      doReturn(true).when(writeFn).isRetriable(any());
-      final int [] times = new int[] {0};
-      if (isDelete) {
-        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
-      } else {
-        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
-      }
-      writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
-    }
-    RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
-    if (sync) {
-      table.put("foo", isDelete ? null : "bar");
-    } else {
-      table.putAsync("foo", isDelete ? null : "bar").get();
-    }
-    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
-    if (isDelete) {
-      verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
-    } else {
-      verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
-      Assert.assertEquals("bar", valCaptor.getValue());
-    }
-    Assert.assertEquals("foo", keyCaptor.getValue());
-    if (isDelete) {
-      verify(table.writeRateLimiter, times(1)).throttle(anyString());
-    } else {
-      verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
-    }
-  }
-
-  @Test
-  public void testPut() throws Exception {
-    doTestPut(true, false, false, false);
-  }
-
-  @Test
-  public void testPutDelete() throws Exception {
-    doTestPut(true, false, true, false);
-  }
-
-  @Test
-  public void testPutAsync() throws Exception {
-    doTestPut(false, false, false, false);
-  }
-
-  @Test
-  public void testPutAsyncDelete() throws Exception {
-    doTestPut(false, false, true, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testPutAsyncError() throws Exception {
-    doTestPut(false, true, false, false);
-  }
-
-  @Test
-  public void testPutAsyncErrorRetried() throws Exception {
-    doTestPut(false, true, false, true);
-  }
-
-  private void doTestDelete(boolean sync, boolean error) throws Exception {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testDelete-" + sync + error,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(null);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(writeFn).deleteAsync(any());
-    ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
-    if (sync) {
-      table.delete("foo");
-    } else {
-      table.deleteAsync("foo").get();
-    }
-    verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
-    Assert.assertEquals("foo", argCaptor.getValue());
-    verify(table.writeRateLimiter, times(1)).throttle(anyString());
-  }
-
-  @Test
-  public void testDelete() throws Exception {
-    doTestDelete(true, false);
-  }
-
-  @Test
-  public void testDeleteAsync() throws Exception {
-    doTestDelete(false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testDeleteAsyncError() throws Exception {
-    doTestDelete(false, true);
-  }
-
-  private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
-    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
-    Map<String, String> res = new HashMap<>();
-    res.put("foo1", "bar1");
-    if (!partial) {
-      res.put("foo2", "bar2");
-    }
-    CompletableFuture<Map<String, String>> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(res);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(readFn).getAllAsync(any());
-    RemoteReadableTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
-    Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
-        : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
-    verify(table.readRateLimiter, times(1)).throttle(anyCollection());
-  }
-
-  @Test
-  public void testGetAll() throws Exception {
-    doTestGetAll(true, false, false);
-  }
-
-  @Test
-  public void testGetAllAsync() throws Exception {
-    doTestGetAll(false, false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testGetAllAsyncError() throws Exception {
-    doTestGetAll(false, true, false);
-  }
-
-  // Partial result is an acceptable scenario
-  @Test
-  public void testGetAllPartialResult() throws Exception {
-    doTestGetAll(false, false, true);
-  }
-
-  public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(null);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(writeFn).putAllAsync(any());
-    if (hasDelete) {
-      doReturn(future).when(writeFn).deleteAllAsync(any());
-    }
-    List<Entry<String, String>> entries = Arrays.asList(
-        new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
-    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
-    if (sync) {
-      table.putAll(entries);
-    } else {
-      table.putAllAsync(entries).get();
-    }
-    verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
-    if (hasDelete) {
-      ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
-      verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
-      Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
-      Assert.assertEquals(1, argCaptor.getValue().size());
-      Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
-      verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
-    } else {
-      Assert.assertEquals(entries, argCaptor.getValue());
-    }
-    verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
-  }
-
-  @Test
-  public void testPutAll() throws Exception {
-    doTestPutAll(true, false, false);
-  }
-
-  @Test
-  public void testPutAllHasDelete() throws Exception {
-    doTestPutAll(true, false, true);
-  }
-
-  @Test
-  public void testPutAllAsync() throws Exception {
-    doTestPutAll(false, false, false);
-  }
-
-  @Test
-  public void testPutAllAsyncHasDelete() throws Exception {
-    doTestPutAll(false, false, true);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testPutAllAsyncError() throws Exception {
-    doTestPutAll(false, true, false);
-  }
-
-  public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(null);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(writeFn).deleteAllAsync(any());
-    List<String> keys = Arrays.asList("foo1", "foo2");
-    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
-    if (sync) {
-      table.deleteAll(keys);
-    } else {
-      table.deleteAllAsync(keys).get();
-    }
-    verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
-    Assert.assertEquals(keys, argCaptor.getValue());
-    verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
-  }
-
-  @Test
-  public void testDeleteAll() throws Exception {
-    doTestDeleteAll(true, false);
-  }
-
-  @Test
-  public void testDeleteAllAsync() throws Exception {
-    doTestDeleteAll(false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testDeleteAllAsyncError() throws Exception {
-    doTestDeleteAll(false, true);
-  }
-
-  @Test
-  public void testFlush() {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
-    table.flush();
-    verify(writeFn, times(1)).flush();
-  }
-
-  @Test
-  public void testGetWithCallbackExecutor() throws Exception {
-    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
-    // Sync is backed by async so needs to mock the async method
-    doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
-    RemoteReadableTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
-        Executors.newSingleThreadExecutor());
-    Thread testThread = Thread.currentThread();
-
-    table.getAsync("foo").thenAccept(result -> {
-        Assert.assertEquals("bar", result);
-        // Must be executed on the executor thread
-        Assert.assertNotSame(testThread, Thread.currentThread());
-      });
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
new file mode 100644
index 0000000..ae96d86
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestRemoteTable {
+  private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
+
+  public static Context getMockContext() {
+    Context context = new MockContext();
+    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+    doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
+    doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
+    doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
+    doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
+    return context;
+  }
+
+  private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
+      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
+    return getTable(tableId, readFn, writeFn, null);
+  }
+
+  private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
+      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
+    RemoteTable<K, V> table;
+
+    TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
+    TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
+    doReturn(true).when(readRateLimiter).isRateLimited();
+    doReturn(true).when(writeRateLimiter).isRateLimited();
+
+    ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
+
+    table = new RemoteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
+
+    Context context = getMockContext();
+
+    table.init(context);
+
+    return (T) table;
+  }
+
+  private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
+    String tableId = "testGet-" + sync + error + retry;
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    // Sync is backed by async so needs to mock the async method
+    CompletableFuture<String> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+      if (!retry) {
+        doReturn(future).when(readFn).getAsync(anyString());
+      } else {
+        final int [] times = new int[] {0};
+        doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
+            .when(readFn).getAsync(anyString());
+      }
+    } else {
+      future = CompletableFuture.completedFuture("bar");
+      doReturn(future).when(readFn).getAsync(anyString());
+    }
+    if (retry) {
+      doReturn(true).when(readFn).isRetriable(any());
+      TableRetryPolicy policy = new TableRetryPolicy();
+      readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
+    }
+    RemoteTable<String, String> table = getTable(tableId, readFn, null);
+    Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
+    verify(table.readRateLimiter, times(1)).throttle(anyString());
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    doTestGet(true, false, false);
+  }
+
+  @Test
+  public void testGetAsync() throws Exception {
+    doTestGet(false, false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testGetAsyncError() throws Exception {
+    doTestGet(false, true, false);
+  }
+
+  @Test
+  public void testGetAsyncErrorRetried() throws Exception {
+    doTestGet(false, true, true);
+  }
+
+  @Test
+  public void testGetMultipleTables() {
+    TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
+    TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
+
+    // Sync is backed by async so needs to mock the async method
+    doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
+    doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
+
+    RemoteTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
+    RemoteTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
+
+    CompletableFuture<String> future1 = table1.getAsync("foo1");
+    CompletableFuture<String> future2 = table2.getAsync("foo2");
+
+    CompletableFuture.allOf(future1, future2)
+        .thenAccept(u -> {
+            Assert.assertEquals(future1.join(), "bar1");
+            Assert.assertEquals(future2.join(), "bar1");
+          });
+  }
+
+  private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
+    String tableId = "testPut-" + sync + error + isDelete + retry;
+    TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
+    TableWriteFunction<String, String> writeFn = mockWriteFn;
+    CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
+    CompletableFuture<Void> failureFuture = new CompletableFuture();
+    failureFuture.completeExceptionally(new RuntimeException("Test exception"));
+    if (!error) {
+      if (isDelete) {
+        doReturn(successFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doReturn(successFuture).when(writeFn).putAsync(any(), any());
+      }
+    } else if (!retry) {
+      if (isDelete) {
+        doReturn(failureFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doReturn(failureFuture).when(writeFn).putAsync(any(), any());
+      }
+    } else {
+      doReturn(true).when(writeFn).isRetriable(any());
+      final int [] times = new int[] {0};
+      if (isDelete) {
+        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
+      }
+      writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
+    }
+    RemoteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
+    if (sync) {
+      table.put("foo", isDelete ? null : "bar");
+    } else {
+      table.putAsync("foo", isDelete ? null : "bar").get();
+    }
+    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
+    if (isDelete) {
+      verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
+    } else {
+      verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
+      Assert.assertEquals("bar", valCaptor.getValue());
+    }
+    Assert.assertEquals("foo", keyCaptor.getValue());
+    if (isDelete) {
+      verify(table.writeRateLimiter, times(1)).throttle(anyString());
+    } else {
+      verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
+    }
+  }
+
+  @Test
+  public void testPut() throws Exception {
+    doTestPut(true, false, false, false);
+  }
+
+  @Test
+  public void testPutDelete() throws Exception {
+    doTestPut(true, false, true, false);
+  }
+
+  @Test
+  public void testPutAsync() throws Exception {
+    doTestPut(false, false, false, false);
+  }
+
+  @Test
+  public void testPutAsyncDelete() throws Exception {
+    doTestPut(false, false, true, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testPutAsyncError() throws Exception {
+    doTestPut(false, true, false, false);
+  }
+
+  @Test
+  public void testPutAsyncErrorRetried() throws Exception {
+    doTestPut(false, true, false, true);
+  }
+
+  private void doTestDelete(boolean sync, boolean error) throws Exception {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteTable<String, String> table = getTable("testDelete-" + sync + error,
+        mock(TableReadFunction.class), writeFn);
+    CompletableFuture<Void> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(null);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(writeFn).deleteAsync(any());
+    ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
+    if (sync) {
+      table.delete("foo");
+    } else {
+      table.deleteAsync("foo").get();
+    }
+    verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
+    Assert.assertEquals("foo", argCaptor.getValue());
+    verify(table.writeRateLimiter, times(1)).throttle(anyString());
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    doTestDelete(true, false);
+  }
+
+  @Test
+  public void testDeleteAsync() throws Exception {
+    doTestDelete(false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testDeleteAsyncError() throws Exception {
+    doTestDelete(false, true);
+  }
+
+  private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    Map<String, String> res = new HashMap<>();
+    res.put("foo1", "bar1");
+    if (!partial) {
+      res.put("foo2", "bar2");
+    }
+    CompletableFuture<Map<String, String>> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(res);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(readFn).getAllAsync(any());
+    RemoteTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
+    Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
+        : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
+    verify(table.readRateLimiter, times(1)).throttle(anyCollection());
+  }
+
+  @Test
+  public void testGetAll() throws Exception {
+    doTestGetAll(true, false, false);
+  }
+
+  @Test
+  public void testGetAllAsync() throws Exception {
+    doTestGetAll(false, false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testGetAllAsyncError() throws Exception {
+    doTestGetAll(false, true, false);
+  }
+
+  // Partial result is an acceptable scenario
+  @Test
+  public void testGetAllPartialResult() throws Exception {
+    doTestGetAll(false, false, true);
+  }
+
+  public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
+        mock(TableReadFunction.class), writeFn);
+    CompletableFuture<Void> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(null);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(writeFn).putAllAsync(any());
+    if (hasDelete) {
+      doReturn(future).when(writeFn).deleteAllAsync(any());
+    }
+    List<Entry<String, String>> entries = Arrays.asList(
+        new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
+    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
+    if (sync) {
+      table.putAll(entries);
+    } else {
+      table.putAllAsync(entries).get();
+    }
+    verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
+    if (hasDelete) {
+      ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
+      verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
+      Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
+      Assert.assertEquals(1, argCaptor.getValue().size());
+      Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
+      verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
+    } else {
+      Assert.assertEquals(entries, argCaptor.getValue());
+    }
+    verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
+  }
+
+  @Test
+  public void testPutAll() throws Exception {
+    doTestPutAll(true, false, false);
+  }
+
+  @Test
+  public void testPutAllHasDelete() throws Exception {
+    doTestPutAll(true, false, true);
+  }
+
+  @Test
+  public void testPutAllAsync() throws Exception {
+    doTestPutAll(false, false, false);
+  }
+
+  @Test
+  public void testPutAllAsyncHasDelete() throws Exception {
+    doTestPutAll(false, false, true);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testPutAllAsyncError() throws Exception {
+    doTestPutAll(false, true, false);
+  }
+
+  public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
+        mock(TableReadFunction.class), writeFn);
+    CompletableFuture<Void> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(null);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(writeFn).deleteAllAsync(any());
+    List<String> keys = Arrays.asList("foo1", "foo2");
+    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
+    if (sync) {
+      table.deleteAll(keys);
+    } else {
+      table.deleteAllAsync(keys).get();
+    }
+    verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
+    Assert.assertEquals(keys, argCaptor.getValue());
+    verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
+  }
+
+  @Test
+  public void testDeleteAll() throws Exception {
+    doTestDeleteAll(true, false);
+  }
+
+  @Test
+  public void testDeleteAllAsync() throws Exception {
+    doTestDeleteAll(false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testDeleteAllAsyncError() throws Exception {
+    doTestDeleteAll(false, true);
+  }
+
+  @Test
+  public void testFlush() {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
+    table.flush();
+    verify(writeFn, times(1)).flush();
+  }
+
+  @Test
+  public void testGetWithCallbackExecutor() throws Exception {
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    // Sync is backed by async so needs to mock the async method
+    doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
+    RemoteTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
+        Executors.newSingleThreadExecutor());
+    Thread testThread = Thread.currentThread();
+
+    table.getAsync("foo").thenAccept(result -> {
+        Assert.assertEquals("bar", result);
+        // Must be executed on the executor thread
+        Assert.assertNotSame(testThread, Thread.currentThread());
+      });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index 3d8e36f..907242f 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -36,17 +36,17 @@ import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteTable;
 import org.apache.samza.table.remote.RemoteTableProvider;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-
 import org.apache.samza.table.retry.RetriableReadFunction;
 import org.apache.samza.table.retry.RetriableWriteFunction;
 import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.util.EmbeddedTaggedRateLimiter;
 import org.apache.samza.util.RateLimiter;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -207,8 +207,8 @@ public class TestRemoteTableDescriptor {
     RemoteTableProvider provider = new RemoteTableProvider(desc.getTableId());
     provider.init(createMockContext(desc));
     Table table = provider.getTable();
-    Assert.assertTrue(table instanceof RemoteReadWriteTable);
-    RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;
+    Assert.assertTrue(table instanceof RemoteTable);
+    RemoteTable rwTable = (RemoteTable) table;
     if (numRateLimitOps > 0) {
       Assert.assertTrue(!rlGets || rwTable.getReadRateLimiter() != null);
       Assert.assertTrue(!rlPuts || rwTable.getWriteRateLimiter() != null);

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
index 5116cab..050ea55 100644
--- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
@@ -35,7 +35,7 @@ import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.table.remote.TestRemoteReadWriteTable;
+import org.apache.samza.table.remote.TestRemoteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 import org.junit.Test;
 
@@ -54,7 +54,7 @@ public class TestRetriableTableFunctions {
 
   public TableMetricsUtil getMetricsUtil(String tableId) {
     Table table = mock(Table.class);
-    Context context = TestRemoteReadWriteTable.getMockContext();
+    Context context = TestRemoteTable.getMockContext();
     return new TableMetricsUtil(context, table, tableId);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
index ddb79ba..fc9ce76 100644
--- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
@@ -19,7 +19,7 @@
 package org.apache.samza.storage.kv.inmemory;
 
 import java.util.Map;
-import junit.framework.Assert;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
@@ -28,7 +28,9 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.storage.kv.LocalTableProviderFactory;
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+
 import org.junit.Test;
+import org.junit.Assert;
 
 
 public class TestInMemoryTableDescriptor {

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
index 62fb3da..319fb0f 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -19,7 +19,7 @@
 package org.apache.samza.storage.kv.descriptors;
 
 import java.util.Map;
-import junit.framework.Assert;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
@@ -30,7 +30,9 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.LocalTableProviderFactory;
 import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+
 import org.junit.Test;
+import org.junit.Assert;
 
 public class TestRocksDbTableDescriptor {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
deleted file mode 100644
index eae6bb0..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadWriteTable;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-
-/**
- * A store backed readable and writable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
-    implements ReadWriteTable<K, V> {
-
-  /**
-   * Constructs an instance of {@link LocalReadWriteTable}
-   * @param tableId the table Id
-   * @param kvStore the backing store
-   */
-  public LocalReadWriteTable(String tableId, KeyValueStore kvStore) {
-    super(tableId, kvStore);
-  }
-
-  @Override
-  public void put(K key, V value) {
-    if (value != null) {
-      instrument(writeMetrics.numPuts, writeMetrics.putNs, () -> kvStore.put(key, value));
-    } else {
-      delete(key);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> putAsync(K key, V value) {
-    CompletableFuture<Void> future = new CompletableFuture();
-    try {
-      put(key, value);
-      future.complete(null);
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void putAll(List<Entry<K, V>> entries) {
-    List<Entry<K, V>> toPut = new LinkedList<>();
-    List<K> toDelete = new LinkedList<>();
-    entries.forEach(e -> {
-        if (e.getValue() != null) {
-          toPut.add(e);
-        } else {
-          toDelete.add(e.getKey());
-        }
-      });
-
-    if (!toPut.isEmpty()) {
-      instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(toPut));
-    }
-
-    if (!toDelete.isEmpty()) {
-      deleteAll(toDelete);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
-    CompletableFuture<Void> future = new CompletableFuture();
-    try {
-      putAll(entries);
-      future.complete(null);
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void delete(K key) {
-    instrument(writeMetrics.numDeletes, writeMetrics.deleteNs, () -> kvStore.delete(key));
-  }
-
-  @Override
-  public CompletableFuture<Void> deleteAsync(K key) {
-    CompletableFuture<Void> future = new CompletableFuture();
-    try {
-      delete(key);
-      future.complete(null);
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void deleteAll(List<K> keys) {
-    instrument(writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs, () -> kvStore.deleteAll(keys));
-  }
-
-  @Override
-  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
-    CompletableFuture<Void> future = new CompletableFuture();
-    try {
-      deleteAll(keys);
-      future.complete(null);
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void flush() {
-    instrument(writeMetrics.numFlushes, writeMetrics.flushNs, () -> kvStore.flush());
-  }
-
-  private interface Func0 {
-    void apply();
-  }
-
-  private void instrument(Counter counter, Timer timer, Func0 func) {
-    incCounter(counter);
-    long startNs = clock.nanoTime();
-    func.apply();
-    updateTimer(timer, clock.nanoTime() - startNs);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
deleted file mode 100644
index 29ddb15..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import com.google.common.base.Preconditions;
-
-import com.google.common.base.Supplier;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.BaseReadableTable;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-/**
- * A store backed readable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> {
-
-  protected final KeyValueStore<K, V> kvStore;
-
-  /**
-   * Constructs an instance of {@link LocalReadableTable}
-   * @param tableId the table Id
-   * @param kvStore the backing store
-   */
-  public LocalReadableTable(String tableId, KeyValueStore<K, V> kvStore) {
-    super(tableId);
-    Preconditions.checkNotNull(kvStore, "null KeyValueStore");
-    this.kvStore = kvStore;
-  }
-
-  @Override
-  public V get(K key) {
-    V result = instrument(readMetrics.numGets, readMetrics.getNs, () -> kvStore.get(key));
-    if (result == null) {
-      incCounter(readMetrics.numMissedLookups);
-    }
-    return result;
-  }
-
-  @Override
-  public CompletableFuture<V> getAsync(K key) {
-    CompletableFuture<V> future = new CompletableFuture();
-    try {
-      future.complete(get(key));
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public Map<K, V> getAll(List<K> keys) {
-    Map<K, V> result = instrument(readMetrics.numGetAlls, readMetrics.getAllNs, () -> kvStore.getAll(keys));
-    result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
-    return result;
-  }
-
-  @Override
-  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
-    CompletableFuture<Map<K, V>> future = new CompletableFuture();
-    try {
-      future.complete(getAll(keys));
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void close() {
-    // The KV store is not closed here as it may still be needed by downstream operators,
-    // it will be closed by the SamzaContainer
-  }
-
-  private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
-    incCounter(counter);
-    long startNs = clock.nanoTime();
-    T result = func.get();
-    updateTimer(timer, clock.nanoTime() - startNs);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
new file mode 100644
index 0000000..d9767b6
--- /dev/null
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.BaseReadWriteTable;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
+
+/**
+ * A store backed readable and writable table
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class LocalTable<K, V> extends BaseReadWriteTable<K, V> {
+
+  protected final KeyValueStore<K, V> kvStore;
+
+  /**
+   * Constructs an instance of {@link LocalTable}
+   * @param tableId the table Id
+   * @param kvStore the backing store
+   */
+  public LocalTable(String tableId, KeyValueStore kvStore) {
+    super(tableId);
+    Preconditions.checkNotNull(kvStore, "null KeyValueStore");
+    this.kvStore = kvStore;
+  }
+
+  @Override
+  public V get(K key) {
+    V result = instrument(metrics.numGets, metrics.getNs, () -> kvStore.get(key));
+    if (result == null) {
+      incCounter(metrics.numMissedLookups);
+    }
+    return result;
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key) {
+    CompletableFuture<V> future = new CompletableFuture();
+    try {
+      future.complete(get(key));
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public Map<K, V> getAll(List<K> keys) {
+    Map<K, V> result = instrument(metrics.numGetAlls, metrics.getAllNs, () -> kvStore.getAll(keys));
+    result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups));
+    return result;
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+    CompletableFuture<Map<K, V>> future = new CompletableFuture();
+    try {
+      future.complete(getAll(keys));
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public void put(K key, V value) {
+    if (value != null) {
+      instrument(metrics.numPuts, metrics.putNs, () -> kvStore.put(key, value));
+    } else {
+      delete(key);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V value) {
+    CompletableFuture<Void> future = new CompletableFuture();
+    try {
+      put(key, value);
+      future.complete(null);
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public void putAll(List<Entry<K, V>> entries) {
+    List<Entry<K, V>> toPut = new LinkedList<>();
+    List<K> toDelete = new LinkedList<>();
+    entries.forEach(e -> {
+        if (e.getValue() != null) {
+          toPut.add(e);
+        } else {
+          toDelete.add(e.getKey());
+        }
+      });
+
+    if (!toPut.isEmpty()) {
+      instrument(metrics.numPutAlls, metrics.putAllNs, () -> kvStore.putAll(toPut));
+    }
+
+    if (!toDelete.isEmpty()) {
+      deleteAll(toDelete);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
+    CompletableFuture<Void> future = new CompletableFuture();
+    try {
+      putAll(entries);
+      future.complete(null);
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public void delete(K key) {
+    instrument(metrics.numDeletes, metrics.deleteNs, () -> kvStore.delete(key));
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key) {
+    CompletableFuture<Void> future = new CompletableFuture();
+    try {
+      delete(key);
+      future.complete(null);
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public void deleteAll(List<K> keys) {
+    instrument(metrics.numDeleteAlls, metrics.deleteAllNs, () -> kvStore.deleteAll(keys));
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+    CompletableFuture<Void> future = new CompletableFuture();
+    try {
+      deleteAll(keys);
+      future.complete(null);
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public void flush() {
+    instrument(metrics.numFlushes, metrics.flushNs, () -> kvStore.flush());
+  }
+
+  @Override
+  public void close() {
+    // The KV store is not closed here as it may still be needed by downstream operators,
+    // it will be closed by the SamzaContainer
+  }
+
+  private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
+    incCounter(counter);
+    long startNs = clock.nanoTime();
+    T result = func.get();
+    updateTimer(timer, clock.nanoTime() - startNs);
+    return result;
+  }
+
+  private interface Func0 {
+    void apply();
+  }
+
+  private void instrument(Counter counter, Timer timer, Func0 func) {
+    incCounter(counter);
+    long startNs = clock.nanoTime();
+    func.apply();
+    updateTimer(timer, clock.nanoTime() - startNs);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
index 3be61d0..5099a7e 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
@@ -20,8 +20,7 @@ package org.apache.samza.storage.kv;
 
 import com.google.common.base.Preconditions;
 import org.apache.samza.context.Context;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.BaseTableProvider;
 
 /**
@@ -53,10 +52,10 @@ public class LocalTableProvider extends BaseTableProvider {
   }
 
   @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
     Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
     Preconditions.checkNotNull(kvStore, "Store not initialized for table " + tableId);
-    ReadableTable table = new LocalReadWriteTable(tableId, kvStore);
+    ReadWriteTable table = new LocalTable(tableId, kvStore);
     table.init(this.context);
     return table;
   }


[2/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and ReadWriteTable

Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
deleted file mode 100644
index 044fab4..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.Arrays;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.ContainerContext;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.JobContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadWriteTable;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-
-public class TestLocalReadWriteTable {
-
-  public static final String TABLE_ID = "t1";
-
-  private Timer putNs;
-  private Timer putAllNs;
-  private Timer deleteNs;
-  private Timer deleteAllNs;
-  private Timer flushNs;
-  private Counter numPuts;
-  private Counter numPutAlls;
-  private Counter numDeletes;
-  private Counter numDeleteAlls;
-  private Counter numFlushes;
-  private Timer putCallbackNs;
-  private Timer deleteCallbackNs;
-
-  private MetricsRegistry metricsRegistry;
-
-  private KeyValueStore kvStore;
-
-  @Before
-  public void setUp() {
-
-    putNs = new Timer("");
-    putAllNs = new Timer("");
-    deleteNs = new Timer("");
-    deleteAllNs = new Timer("");
-    flushNs = new Timer("");
-    numPuts = new Counter("");
-    numPutAlls = new Counter("");
-    numDeletes = new Counter("");
-    numDeleteAlls = new Counter("");
-    numFlushes = new Counter("");
-    putCallbackNs = new Timer("");
-    deleteCallbackNs = new Timer("");
-
-    metricsRegistry = mock(MetricsRegistry.class);
-    String groupName = LocalReadWriteTable.class.getSimpleName();
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs);
-
-    kvStore = mock(KeyValueStore.class);
-  }
-
-  @Test
-  public void testPut() throws Exception {
-    ReadWriteTable table = createTable(false);
-    table.put("k1", "v1");
-    table.putAsync("k2", "v2").get();
-    table.putAsync("k3", null).get();
-    verify(kvStore, times(2)).put(any(), any());
-    verify(kvStore, times(1)).delete(any());
-    Assert.assertEquals(2, numPuts.getCount());
-    Assert.assertEquals(1, numDeletes.getCount());
-    Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
-    Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(0, numDeleteAlls.getCount());
-    Assert.assertEquals(0, numFlushes.getCount());
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testPutAll() throws Exception {
-    ReadWriteTable table = createTable(false);
-    List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null));
-    table.putAll(entries);
-    table.putAllAsync(entries).get();
-    verify(kvStore, times(2)).putAll(any());
-    verify(kvStore, times(2)).deleteAll(any());
-    Assert.assertEquals(2, numPutAlls.getCount());
-    Assert.assertEquals(2, numDeleteAlls.getCount());
-    Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
-    Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, numPuts.getCount());
-    Assert.assertEquals(0, numDeletes.getCount());
-    Assert.assertEquals(0, numFlushes.getCount());
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testDelete() throws Exception {
-    ReadWriteTable table = createTable(false);
-    table.delete("");
-    table.deleteAsync("").get();
-    verify(kvStore, times(2)).delete(any());
-    Assert.assertEquals(2, numDeletes.getCount());
-    Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, numPuts.getCount());
-    Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(0, numDeleteAlls.getCount());
-    Assert.assertEquals(0, numFlushes.getCount());
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testDeleteAll() throws Exception {
-    ReadWriteTable table = createTable(false);
-    table.deleteAll(Collections.emptyList());
-    table.deleteAllAsync(Collections.emptyList()).get();
-    verify(kvStore, times(2)).deleteAll(any());
-    Assert.assertEquals(2, numDeleteAlls.getCount());
-    Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, numPuts.getCount());
-    Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(0, numDeletes.getCount());
-    Assert.assertEquals(0, numFlushes.getCount());
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testFlush() {
-    ReadWriteTable table = createTable(false);
-    table.flush();
-    table.flush();
-    verify(kvStore, times(2)).flush();
-    Assert.assertEquals(2, numFlushes.getCount());
-    Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, numPuts.getCount());
-    Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(0, numDeletes.getCount());
-    Assert.assertEquals(0, numDeleteAlls.getCount());
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testTimerDisabled() throws Exception {
-    ReadWriteTable table = createTable(true);
-    table.put("", "");
-    table.putAsync("", "").get();
-    table.putAll(Collections.emptyList());
-    table.putAllAsync(Collections.emptyList()).get();
-    table.delete("");
-    table.deleteAsync("").get();
-    table.deleteAll(Collections.emptyList());
-    table.deleteAllAsync(Collections.emptyList()).get();
-    table.flush();
-    Assert.assertEquals(1, numFlushes.getCount());
-    Assert.assertEquals(2, numPuts.getCount());
-    Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(2, numDeletes.getCount());
-    Assert.assertEquals(2, numDeleteAlls.getCount());
-    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  private LocalReadWriteTable createTable(boolean isTimerDisabled) {
-    Map<String, String> config = new HashMap<>();
-    if (isTimerDisabled) {
-      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
-    }
-    Context context = mock(Context.class);
-    JobContext jobContext = mock(JobContext.class);
-    when(context.getJobContext()).thenReturn(jobContext);
-    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
-    ContainerContext containerContext = mock(ContainerContext.class);
-    when(context.getContainerContext()).thenReturn(containerContext);
-    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
-
-    LocalReadWriteTable table =  new LocalReadWriteTable("t1", kvStore);
-    table.init(context);
-
-    return table;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
deleted file mode 100644
index e1c82d9..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.ContainerContext;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.JobContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadableTable;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-public class TestLocalReadableTable {
-
-  public static final String TABLE_ID = "t1";
-
-  private List<String> keys;
-  private Map<String, String> values;
-
-  private Timer getNs;
-  private Timer getAllNs;
-  private Counter numGets;
-  private Counter numGetAlls;
-  private Timer getCallbackNs;
-  private Counter numMissedLookups;
-
-  private MetricsRegistry metricsRegistry;
-
-  private KeyValueStore kvStore;
-
-  @Before
-  public void setUp() {
-    keys = Arrays.asList("k1", "k2", "k3");
-
-    values = new HashMap<>();
-    values.put("k1", "v1");
-    values.put("k2", "v2");
-    values.put("k3", null);
-
-    kvStore = mock(KeyValueStore.class);
-    when(kvStore.get("k1")).thenReturn("v1");
-    when(kvStore.get("k2")).thenReturn("v2");
-    when(kvStore.getAll(keys)).thenReturn(values);
-
-    getNs = new Timer("");
-    getAllNs = new Timer("");
-    numGets = new Counter("");
-    numGetAlls = new Counter("");
-    getCallbackNs = new Timer("");
-    numMissedLookups = new Counter("");
-
-    metricsRegistry = mock(MetricsRegistry.class);
-    String groupName = LocalReadableTable.class.getSimpleName();
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs);
-  }
-
-  @Test
-  public void testGet() throws Exception {
-    ReadableTable table = createTable(false);
-    Assert.assertEquals("v1", table.get("k1"));
-    Assert.assertEquals("v2", table.getAsync("k2").get());
-    Assert.assertNull(table.get("k3"));
-    verify(kvStore, times(3)).get(any());
-    Assert.assertEquals(3, numGets.getCount());
-    Assert.assertEquals(1, numMissedLookups.getCount());
-    Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, numGetAlls.getCount());
-    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testGetAll() throws Exception {
-    ReadableTable table = createTable(false);
-    Assert.assertEquals(values, table.getAll(keys));
-    Assert.assertEquals(values, table.getAllAsync(keys).get());
-    verify(kvStore, times(2)).getAll(any());
-    Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList()));
-    Assert.assertEquals(2, numMissedLookups.getCount());
-    Assert.assertEquals(3, numGetAlls.getCount());
-    Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, numGets.getCount());
-    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testTimerDisabled() throws Exception {
-    ReadableTable table = createTable(true);
-    table.get("");
-    table.getAsync("").get();
-    table.getAll(keys);
-    table.getAllAsync(keys).get();
-    Assert.assertEquals(2, numGets.getCount());
-    Assert.assertEquals(4, numMissedLookups.getCount());
-    Assert.assertEquals(2, numGetAlls.getCount());
-    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  private LocalReadableTable createTable(boolean isTimerDisabled) {
-    Map<String, String> config = new HashMap<>();
-    if (isTimerDisabled) {
-      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
-    }
-    Context context = mock(Context.class);
-    JobContext jobContext = mock(JobContext.class);
-    when(context.getJobContext()).thenReturn(jobContext);
-    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
-    ContainerContext containerContext = mock(ContainerContext.class);
-    when(context.getContainerContext()).thenReturn(containerContext);
-    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
-
-    LocalReadableTable table =  new LocalReadableTable("t1", kvStore);
-    table.init(context);
-
-    return table;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
index 5367931..263ab56 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.storage.kv;
 
-import junit.framework.Assert;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
@@ -28,6 +27,7 @@ import org.apache.samza.context.TaskContext;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Test;
+import org.junit.Assert;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
new file mode 100644
index 0000000..0fd4539
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+
+import org.apache.samza.table.ReadWriteTable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestLocalTableRead {
+
+  public static final String TABLE_ID = "t1";
+
+  private List<String> keys;
+  private Map<String, String> values;
+
+  private Timer getNs;
+  private Timer getAllNs;
+  private Counter numGets;
+  private Counter numGetAlls;
+  private Timer getCallbackNs;
+  private Counter numMissedLookups;
+
+  private MetricsRegistry metricsRegistry;
+
+  private KeyValueStore kvStore;
+
+  @Before
+  public void setUp() {
+    keys = Arrays.asList("k1", "k2", "k3");
+
+    values = new HashMap<>();
+    values.put("k1", "v1");
+    values.put("k2", "v2");
+    values.put("k3", null);
+
+    kvStore = mock(KeyValueStore.class);
+    when(kvStore.get("k1")).thenReturn("v1");
+    when(kvStore.get("k2")).thenReturn("v2");
+    when(kvStore.getAll(keys)).thenReturn(values);
+
+    getNs = new Timer("");
+    getAllNs = new Timer("");
+    numGets = new Counter("");
+    numGetAlls = new Counter("");
+    getCallbackNs = new Timer("");
+    numMissedLookups = new Counter("");
+
+    metricsRegistry = mock(MetricsRegistry.class);
+    String groupName = LocalTable.class.getSimpleName();
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs);
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    ReadWriteTable table = createTable(false);
+    Assert.assertEquals("v1", table.get("k1"));
+    Assert.assertEquals("v2", table.getAsync("k2").get());
+    Assert.assertNull(table.get("k3"));
+    verify(kvStore, times(3)).get(any());
+    Assert.assertEquals(3, numGets.getCount());
+    Assert.assertEquals(1, numMissedLookups.getCount());
+    Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, numGetAlls.getCount());
+    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testGetAll() throws Exception {
+    ReadWriteTable table = createTable(false);
+    Assert.assertEquals(values, table.getAll(keys));
+    Assert.assertEquals(values, table.getAllAsync(keys).get());
+    verify(kvStore, times(2)).getAll(any());
+    Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList()));
+    Assert.assertEquals(2, numMissedLookups.getCount());
+    Assert.assertEquals(3, numGetAlls.getCount());
+    Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, numGets.getCount());
+    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testTimerDisabled() throws Exception {
+    ReadWriteTable table = createTable(true);
+    table.get("");
+    table.getAsync("").get();
+    table.getAll(keys);
+    table.getAllAsync(keys).get();
+    Assert.assertEquals(2, numGets.getCount());
+    Assert.assertEquals(4, numMissedLookups.getCount());
+    Assert.assertEquals(2, numGetAlls.getCount());
+    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  private LocalTable createTable(boolean isTimerDisabled) {
+    Map<String, String> config = new HashMap<>();
+    if (isTimerDisabled) {
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+    }
+    Context context = mock(Context.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(context.getJobContext()).thenReturn(jobContext);
+    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(context.getContainerContext()).thenReturn(containerContext);
+    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+    LocalTable table =  new LocalTable("t1", kvStore);
+    table.init(context);
+
+    return table;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
new file mode 100644
index 0000000..80eb99f
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.ReadWriteTable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestLocalTableWrite {
+
+  public static final String TABLE_ID = "t1";
+
+  private Timer putNs;
+  private Timer putAllNs;
+  private Timer deleteNs;
+  private Timer deleteAllNs;
+  private Timer flushNs;
+  private Counter numPuts;
+  private Counter numPutAlls;
+  private Counter numDeletes;
+  private Counter numDeleteAlls;
+  private Counter numFlushes;
+  private Timer putCallbackNs;
+  private Timer deleteCallbackNs;
+
+  private MetricsRegistry metricsRegistry;
+
+  private KeyValueStore kvStore;
+
+  @Before
+  public void setUp() {
+
+    putNs = new Timer("");
+    putAllNs = new Timer("");
+    deleteNs = new Timer("");
+    deleteAllNs = new Timer("");
+    flushNs = new Timer("");
+    numPuts = new Counter("");
+    numPutAlls = new Counter("");
+    numDeletes = new Counter("");
+    numDeleteAlls = new Counter("");
+    numFlushes = new Counter("");
+    putCallbackNs = new Timer("");
+    deleteCallbackNs = new Timer("");
+
+    metricsRegistry = mock(MetricsRegistry.class);
+    String groupName = LocalTable.class.getSimpleName();
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs);
+
+    kvStore = mock(KeyValueStore.class);
+  }
+
+  @Test
+  public void testPut() throws Exception {
+    ReadWriteTable table = createTable(false);
+    table.put("k1", "v1");
+    table.putAsync("k2", "v2").get();
+    table.putAsync("k3", null).get();
+    verify(kvStore, times(2)).put(any(), any());
+    verify(kvStore, times(1)).delete(any());
+    Assert.assertEquals(2, numPuts.getCount());
+    Assert.assertEquals(1, numDeletes.getCount());
+    Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
+    Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeleteAlls.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testPutAll() throws Exception {
+    ReadWriteTable table = createTable(false);
+    List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null));
+    table.putAll(entries);
+    table.putAllAsync(entries).get();
+    verify(kvStore, times(2)).putAll(any());
+    verify(kvStore, times(2)).deleteAll(any());
+    Assert.assertEquals(2, numPutAlls.getCount());
+    Assert.assertEquals(2, numDeleteAlls.getCount());
+    Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numDeletes.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    ReadWriteTable table = createTable(false);
+    table.delete("");
+    table.deleteAsync("").get();
+    verify(kvStore, times(2)).delete(any());
+    Assert.assertEquals(2, numDeletes.getCount());
+    Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeleteAlls.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testDeleteAll() throws Exception {
+    ReadWriteTable table = createTable(false);
+    table.deleteAll(Collections.emptyList());
+    table.deleteAllAsync(Collections.emptyList()).get();
+    verify(kvStore, times(2)).deleteAll(any());
+    Assert.assertEquals(2, numDeleteAlls.getCount());
+    Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeletes.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testFlush() {
+    ReadWriteTable table = createTable(false);
+    table.flush();
+    table.flush();
+    verify(kvStore, times(2)).flush();
+    Assert.assertEquals(2, numFlushes.getCount());
+    Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeletes.getCount());
+    Assert.assertEquals(0, numDeleteAlls.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testTimerDisabled() throws Exception {
+    ReadWriteTable table = createTable(true);
+    table.put("", "");
+    table.putAsync("", "").get();
+    table.putAll(Collections.emptyList());
+    table.putAllAsync(Collections.emptyList()).get();
+    table.delete("");
+    table.deleteAsync("").get();
+    table.deleteAll(Collections.emptyList());
+    table.deleteAllAsync(Collections.emptyList()).get();
+    table.flush();
+    Assert.assertEquals(1, numFlushes.getCount());
+    Assert.assertEquals(2, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(2, numDeletes.getCount());
+    Assert.assertEquals(2, numDeleteAlls.getCount());
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  private LocalTable createTable(boolean isTimerDisabled) {
+    Map<String, String> config = new HashMap<>();
+    if (isTimerDisabled) {
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+    }
+    Context context = mock(Context.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(context.getJobContext()).thenReturn(jobContext);
+    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(context.getContainerContext()).thenReturn(containerContext);
+    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+    LocalTable table =  new LocalTable("t1", kvStore);
+    table.init(context);
+
+    return table;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 2137a46..ab650f2 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -229,7 +229,7 @@ public class StreamTaskIntegrationTest {
 
     @Override
     public void init(Context context) throws Exception {
-      profileViewTable = (ReadWriteTable<Integer, Profile>) context.getTaskContext().getTable("profile-view-store");
+      profileViewTable = context.getTaskContext().getTable("profile-view-store");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
deleted file mode 100644
index b447493..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.TaskApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
-import org.apache.samza.context.Context;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.util.ArraySystemFactory;
-import org.apache.samza.test.util.Base64Serializer;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
-import static org.apache.samza.test.table.TestTableData.PageView;
-import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
-import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * This test class tests sendTo() and join() for local tables
- */
-public class TestLocalTable extends AbstractIntegrationTestHarness {
-
-  @Test
-  public void testSendTo() throws Exception {
-
-    int count = 10;
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
-    int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.Profile.samza.system", "test");
-    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
-    MyMapFunction mapFn = new MyMapFunction();
-
-    final StreamApplication app = appDesc -> {
-
-      Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
-          KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-
-      appDesc.getInputStream(isd)
-          .map(mapFn)
-          .sendTo(table);
-    };
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    for (int i = 0; i < partitionCount; i++) {
-      MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
-      assertEquals(count, mapFnCopy.received.size());
-      mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
-    }
-  }
-
-  static class StreamTableJoinApp implements StreamApplication {
-    static List<PageView> received = new LinkedList<>();
-    static List<EnrichedPageView> joined = new LinkedList<>();
-
-    @Override
-    public void describe(StreamApplicationDescriptor appDesc) {
-      Table<KV<Integer, Profile>> table = appDesc.getTable(
-          new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-      appDesc.getInputStream(profileISD)
-          .map(m -> new KV(m.getMemberId(), m))
-          .sendTo(table);
-
-      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-      appDesc.getInputStream(pageViewISD)
-          .map(pv -> {
-              received.add(pv);
-              return pv;
-            })
-          .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
-          .join(table, new PageViewToProfileJoinFunction())
-          .sink((m, collector, coordinator) -> joined.add(m));
-    }
-  }
-
-  @Test
-  public void testStreamTableJoin() throws Exception {
-
-    int count = 10;
-    PageView[] pageViews = TestTableData.generatePageViews(count);
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
-    int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.Profile.samza.system", "test");
-    configs.put("streams.Profile.samza.bootstrap", "true");
-    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
-    assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
-    assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
-  }
-
-  static class DualStreamTableJoinApp implements StreamApplication {
-    static List<Profile> sentToProfileTable1 = new LinkedList<>();
-    static List<Profile> sentToProfileTable2 = new LinkedList<>();
-    static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
-    static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
-
-    @Override
-    public void describe(StreamApplicationDescriptor appDesc) {
-      KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
-      KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
-
-      PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
-      PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
-
-      Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
-
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
-      GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
-      MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
-      MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
-
-      profileStream1
-          .map(m -> {
-              sentToProfileTable1.add(m);
-              return new KV(m.getMemberId(), m);
-            })
-          .sendTo(profileTable);
-      profileStream2
-          .map(m -> {
-              sentToProfileTable2.add(m);
-              return new KV(m.getMemberId(), m);
-            })
-          .sendTo(profileTable);
-
-      GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
-      GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
-      MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
-      MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
-
-      pageViewStream1
-          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
-          .join(profileTable, joinFn1)
-          .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
-
-      pageViewStream2
-          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
-          .join(profileTable, joinFn2)
-          .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
-    }
-  }
-
-  @Test
-  public void testDualStreamTableJoin() throws Exception {
-
-    int count = 10;
-    PageView[] pageViews = TestTableData.generatePageViews(count);
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
-    int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.Profile1.samza.system", "test");
-    configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile1.samza.bootstrap", "true");
-    configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.Profile2.samza.system", "test");
-    configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile2.samza.bootstrap", "true");
-    configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.PageView1.samza.system", "test");
-    configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.PageView2.samza.system", "test");
-    configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
-
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
-    assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
-    assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
-  }
-
-  static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
-    Map<String, String> configs = new HashMap<>();
-    configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
-
-    configs.put(JobConfig.JOB_NAME(), "test-table-job");
-    configs.put(JobConfig.PROCESSOR_ID(), "1");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
-    configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
-
-    // For intermediate streams
-    configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
-    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
-    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
-    configs.put("systems.kafka.samza.key.serde", "int");
-    configs.put("systems.kafka.samza.msg.serde", "json");
-    configs.put("systems.kafka.default.stream.replication.factor", "1");
-    configs.put("job.default.system", "kafka");
-
-    configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
-    configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
-
-    return configs;
-  }
-
-  private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
-
-    private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
-
-    private transient List<Profile> received;
-    private transient ReadableTable table;
-
-    @Override
-    public void init(Context context) {
-      table = (ReadableTable) context.getTaskContext().getTable("t1");
-      this.received = new ArrayList<>();
-
-      taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
-    }
-
-    @Override
-    public KV<Integer, Profile> apply(Profile profile) {
-      received.add(profile);
-      return new KV(profile.getMemberId(), profile);
-    }
-
-    public static MyMapFunction getMapFunctionByTask(String taskName) {
-      return taskToMapFunctionMap.get(taskName);
-    }
-  }
-
-  @Test
-  public void testWithLowLevelApi() throws Exception {
-
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
-    configs.put("streams.PageView.partitionCount", String.valueOf(4));
-    configs.put("task.inputs", "test.PageView");
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-  }
-
-  static public class MyTaskApplication implements TaskApplication {
-    @Override
-    public void describe(TaskApplicationDescriptor appDescriptor) {
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-      appDescriptor
-          .withInputStream(pageViewISD)
-          .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())))
-          .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
-    }
-  }
-
-  static public class MyStreamTask implements StreamTask, InitableTask {
-    private ReadWriteTable<Integer, PageView> pageViewTable;
-    @Override
-    public void init(Context context) throws Exception {
-      pageViewTable = (ReadWriteTable<Integer, PageView>) context.getTaskContext().getTable("t1");
-    }
-    @Override
-    public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
-      PageView pv = (PageView) message.getMessage();
-      pageViewTable.put(pv.getMemberId(), pv);
-      PageView pv2 = pageViewTable.get(pv.getMemberId());
-      Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
-      Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
new file mode 100644
index 0000000..0303c26
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.context.Context;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.ArraySystemFactory;
+import org.apache.samza.test.util.Base64Serializer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
+import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * This test class tests sendTo() and join() for local tables
+ */
+public class TestLocalTableEndToEnd extends AbstractIntegrationTestHarness {
+
+  @Test
+  public void testSendTo() throws Exception {
+
+    int count = 10;
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+    configs.put("streams.Profile.samza.system", "test");
+    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
+
+    MyMapFunction mapFn = new MyMapFunction();
+
+    final StreamApplication app = appDesc -> {
+
+      Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
+          KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+
+      appDesc.getInputStream(isd)
+          .map(mapFn)
+          .sendTo(table);
+    };
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+
+    for (int i = 0; i < partitionCount; i++) {
+      MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
+      assertEquals(count, mapFnCopy.received.size());
+      mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
+    }
+  }
+
+  static class StreamTableJoinApp implements StreamApplication {
+    static List<PageView> received = new LinkedList<>();
+    static List<EnrichedPageView> joined = new LinkedList<>();
+
+    @Override
+    public void describe(StreamApplicationDescriptor appDesc) {
+      Table<KV<Integer, Profile>> table = appDesc.getTable(
+          new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+      appDesc.getInputStream(profileISD)
+          .map(m -> new KV(m.getMemberId(), m))
+          .sendTo(table);
+
+      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDesc.getInputStream(pageViewISD)
+          .map(pv -> {
+              received.add(pv);
+              return pv;
+            })
+          .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
+          .join(table, new PageViewToProfileJoinFunction())
+          .sink((m, collector, coordinator) -> joined.add(m));
+    }
+  }
+
+  @Test
+  public void testStreamTableJoin() throws Exception {
+
+    int count = 10;
+    PageView[] pageViews = TestTableData.generatePageViews(count);
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.Profile.samza.system", "test");
+    configs.put("streams.Profile.samza.bootstrap", "true");
+    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
+    assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
+    assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
+  }
+
+  static class DualStreamTableJoinApp implements StreamApplication {
+    static List<Profile> sentToProfileTable1 = new LinkedList<>();
+    static List<Profile> sentToProfileTable2 = new LinkedList<>();
+    static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
+    static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
+
+    @Override
+    public void describe(StreamApplicationDescriptor appDesc) {
+      KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
+      KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
+
+      PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
+      PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
+
+      Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
+
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
+      GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
+      MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
+      MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
+
+      profileStream1
+          .map(m -> {
+              sentToProfileTable1.add(m);
+              return new KV(m.getMemberId(), m);
+            })
+          .sendTo(profileTable);
+      profileStream2
+          .map(m -> {
+              sentToProfileTable2.add(m);
+              return new KV(m.getMemberId(), m);
+            })
+          .sendTo(profileTable);
+
+      GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
+      GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
+      MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
+      MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
+
+      pageViewStream1
+          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
+          .join(profileTable, joinFn1)
+          .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
+
+      pageViewStream2
+          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
+          .join(profileTable, joinFn2)
+          .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
+    }
+  }
+
+  @Test
+  public void testDualStreamTableJoin() throws Exception {
+
+    int count = 10;
+    PageView[] pageViews = TestTableData.generatePageViews(count);
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+    configs.put("streams.Profile1.samza.system", "test");
+    configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile1.samza.bootstrap", "true");
+    configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.Profile2.samza.system", "test");
+    configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile2.samza.bootstrap", "true");
+    configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.PageView1.samza.system", "test");
+    configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.PageView2.samza.system", "test");
+    configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
+    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
+
+    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
+    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
+    assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
+    assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
+  }
+
+  static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
+    Map<String, String> configs = new HashMap<>();
+    configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
+
+    configs.put(JobConfig.JOB_NAME(), "test-table-job");
+    configs.put(JobConfig.PROCESSOR_ID(), "1");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+
+    // For intermediate streams
+    configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
+    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
+    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
+    configs.put("systems.kafka.samza.key.serde", "int");
+    configs.put("systems.kafka.samza.msg.serde", "json");
+    configs.put("systems.kafka.default.stream.replication.factor", "1");
+    configs.put("job.default.system", "kafka");
+
+    configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
+    configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
+
+    return configs;
+  }
+
+  private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
+
+    private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
+
+    private transient List<Profile> received;
+    private transient ReadWriteTable table;
+
+    @Override
+    public void init(Context context) {
+      table = context.getTaskContext().getTable("t1");
+      this.received = new ArrayList<>();
+
+      taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
+    }
+
+    @Override
+    public KV<Integer, Profile> apply(Profile profile) {
+      received.add(profile);
+      return new KV(profile.getMemberId(), profile);
+    }
+
+    public static MyMapFunction getMapFunctionByTask(String taskName) {
+      return taskToMapFunctionMap.get(taskName);
+    }
+  }
+
+  @Test
+  public void testWithLowLevelApi() throws Exception {
+
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
+    configs.put("streams.PageView.partitionCount", String.valueOf(4));
+    configs.put("task.inputs", "test.PageView");
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+  }
+
+  static public class MyTaskApplication implements TaskApplication {
+    @Override
+    public void describe(TaskApplicationDescriptor appDescriptor) {
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDescriptor
+          .withInputStream(pageViewISD)
+          .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())))
+          .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+    }
+  }
+
+  static public class MyStreamTask implements StreamTask, InitableTask {
+    private ReadWriteTable<Integer, PageView> pageViewTable;
+    @Override
+    public void init(Context context) throws Exception {
+      pageViewTable = context.getTaskContext().getTable("t1");
+    }
+    @Override
+    public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
+      PageView pv = (PageView) message.getMessage();
+      pageViewTable.put(pv.getMemberId(), pv);
+      PageView pv2 = pageViewTable.get(pv.getMemberId());
+      Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
+      Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
deleted file mode 100644
index 3de8300..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import com.google.common.cache.CacheBuilder;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.descriptors.CachingTableDescriptor;
-import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
-import org.apache.samza.table.remote.RemoteReadableTable;
-import org.apache.samza.table.descriptors.RemoteTableDescriptor;
-import org.apache.samza.table.remote.TableRateLimiter;
-import org.apache.samza.table.remote.TableReadFunction;
-import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.util.Base64Serializer;
-import org.apache.samza.util.RateLimiter;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
-import static org.apache.samza.test.table.TestTableData.PageView;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.generatePageViews;
-import static org.apache.samza.test.table.TestTableData.generateProfiles;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.withSettings;
-
-
-public class TestRemoteTable extends AbstractIntegrationTestHarness {
-
-  static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
-
-  static class InMemoryReadFunction implements TableReadFunction<Integer, Profile> {
-    private final String serializedProfiles;
-    private transient Map<Integer, Profile> profileMap;
-
-    private InMemoryReadFunction(String profiles) {
-      this.serializedProfiles = profiles;
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-      Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
-      this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
-    }
-
-    @Override
-    public CompletableFuture<Profile> getAsync(Integer key) {
-      return CompletableFuture.completedFuture(profileMap.get(key));
-    }
-
-    @Override
-    public boolean isRetriable(Throwable exception) {
-      return false;
-    }
-
-    static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) {
-      return new InMemoryReadFunction(serializedProfiles);
-    }
-  }
-
-  static class InMemoryWriteFunction implements TableWriteFunction<Integer, EnrichedPageView> {
-    private transient List<EnrichedPageView> records;
-    private String testName;
-
-    public InMemoryWriteFunction(String testName) {
-      this.testName = testName;
-    }
-
-    // Verify serializable functionality
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-
-      // Write to the global list for verification
-      records = writtenRecords.get(testName);
-    }
-
-    @Override
-    public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView record) {
-      records.add(record);
-      return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public CompletableFuture<Void> deleteAsync(Integer key) {
-      records.remove(key);
-      return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public boolean isRetriable(Throwable exception) {
-      return false;
-    }
-  }
-
-  private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
-    CachingTableDescriptor<K, V> cachingDesc;
-    if (defaultCache) {
-      cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
-      cachingDesc.withReadTtl(Duration.ofMinutes(5));
-      cachingDesc.withWriteTtl(Duration.ofMinutes(5));
-    } else {
-      GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
-      guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
-      cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
-    }
-
-    return appDesc.getTable(cachingDesc);
-  }
-
-  static class MyReadFunction implements TableReadFunction {
-    @Override
-    public CompletableFuture getAsync(Object key) {
-      return null;
-    }
-
-    @Override
-    public boolean isRetriable(Throwable exception) {
-      return false;
-    }
-  }
-
-  private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception {
-    final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
-
-    writtenRecords.put(testName, new ArrayList<>());
-
-    int count = 10;
-    PageView[] pageViews = generatePageViews(count);
-    String profiles = Base64Serializer.serialize(generateProfiles(count));
-
-    int partitionCount = 4;
-    Map<String, String> configs = TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
-
-    final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable());
-    final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable());
-    final StreamApplication app = appDesc -> {
-      RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
-      inputTableDesc
-          .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
-          .withRateLimiter(readRateLimiter, null, null);
-
-      // dummy reader
-      TableReadFunction readFn = new MyReadFunction();
-
-      RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
-      outputTableDesc
-          .withReadFunction(readFn)
-          .withWriteFunction(writer)
-          .withRateLimiter(writeRateLimiter, null, null);
-
-      Table<KV<Integer, EnrichedPageView>> outputTable = withCache
-          ? getCachingTable(outputTableDesc, defaultCache, "output", appDesc)
-          : appDesc.getTable(outputTableDesc);
-
-      Table<KV<Integer, Profile>> inputTable = withCache
-          ? getCachingTable(inputTableDesc, defaultCache, "input", appDesc)
-          : appDesc.getTable(inputTableDesc);
-
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-      appDesc.getInputStream(isd)
-          .map(pv -> new KV<>(pv.getMemberId(), pv))
-          .join(inputTable, new PageViewToProfileJoinFunction())
-          .map(m -> new KV(m.getMemberId(), m))
-          .sendTo(outputTable);
-    };
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    int numExpected = count * partitionCount;
-    Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
-    Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof EnrichedPageView);
-  }
-
-  @Test
-  public void testStreamTableJoinRemoteTable() throws Exception {
-    doTestStreamTableJoinRemoteTable(false, false, "testStreamTableJoinRemoteTable");
-  }
-
-  @Test
-  public void testStreamTableJoinRemoteTableWithCache() throws Exception {
-    doTestStreamTableJoinRemoteTable(true, false, "testStreamTableJoinRemoteTableWithCache");
-  }
-
-  @Test
-  public void testStreamTableJoinRemoteTableWithDefaultCache() throws Exception {
-    doTestStreamTableJoinRemoteTable(true, true, "testStreamTableJoinRemoteTableWithDefaultCache");
-  }
-
-  private Context createMockContext() {
-    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
-    doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
-    Context context = new MockContext();
-    doReturn(new MapConfig()).when(context.getJobContext()).getConfig();
-    doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
-    return context;
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testCatchReaderException() {
-    TableReadFunction<String, ?> reader = mock(TableReadFunction.class);
-    CompletableFuture<String> future = new CompletableFuture<>();
-    future.completeExceptionally(new RuntimeException("Expected test exception"));
-    doReturn(future).when(reader).getAsync(anyString());
-    TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
-    RemoteReadableTable<String, ?> table = new RemoteReadableTable<>(
-        "table1", reader, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
-    table.init(createMockContext());
-    table.get("abc");
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testCatchWriterException() {
-    TableReadFunction<String, String> reader = mock(TableReadFunction.class);
-    TableWriteFunction<String, String> writer = mock(TableWriteFunction.class);
-    CompletableFuture<String> future = new CompletableFuture<>();
-    future.completeExceptionally(new RuntimeException("Expected test exception"));
-    doReturn(future).when(writer).putAsync(anyString(), any());
-    TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
-    RemoteReadWriteTable<String, String> table = new RemoteReadWriteTable<String, String>(
-        "table1", reader, writer, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
-    table.init(createMockContext());
-    table.put("abc", "efg");
-  }
-}