You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/26 23:32:49 UTC

samza git commit: SAMZA-2006: Removed config from table provider constructor

Repository: samza
Updated Branches:
  refs/heads/master dccada916 -> f529722f5


SAMZA-2006: Removed config from table provider constructor

With the latest API change in Samza 1.0, config can be obtained from Context object during init(), therefore we do not to pass this in the constructor.

Author: Wei Song <ws...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #816 from weisong44/SAMZA-2006


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f529722f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f529722f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f529722f

Branch: refs/heads/master
Commit: f529722f541e5475d5b558fe4caefd7b9d04f754
Parents: dccada9
Author: Wei Song <ws...@linkedin.com>
Authored: Mon Nov 26 15:32:31 2018 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Mon Nov 26 15:32:31 2018 -0800

----------------------------------------------------------------------
 .../samza/table/TableProviderFactory.java       |   4 +-
 .../apache/samza/table/BaseTableProvider.java   |   8 +-
 .../org/apache/samza/table/TableManager.java    |   2 +-
 .../table/caching/CachingTableProvider.java     |  15 +-
 .../caching/CachingTableProviderFactory.java    |   5 +-
 .../caching/guava/GuavaCacheTableProvider.java  |   9 +-
 .../guava/GuavaCacheTableProviderFactory.java   |   5 +-
 .../samza/table/remote/RemoteTableProvider.java |  23 +-
 .../remote/RemoteTableProviderFactory.java      |   5 +-
 .../apache/samza/table/TestTableManager.java    |   3 +-
 .../samza/table/caching/TestCachingTable.java   |   8 +-
 .../descriptors/TestLocalTableDescriptor.java   |   2 +-
 .../table/remote/TestRemoteReadWriteTable.java  | 458 +++++++++++++++++++
 .../samza/table/remote/TestRemoteTable.java     | 458 -------------------
 .../descriptors/TestRemoteTableDescriptor.java  |  19 +-
 .../retry/TestRetriableTableFunctions.java      |   4 +-
 .../samza/storage/kv/LocalTableProvider.java    |   6 +-
 .../storage/kv/LocalTableProviderFactory.java   |   5 +-
 .../kv/descriptors/TestLocalTableProvider.java  |   3 +-
 .../samza/test/table/TestRemoteTable.java       |   9 +-
 20 files changed, 524 insertions(+), 527 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
index 86214a5..0ed6ad0 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
@@ -19,7 +19,6 @@
 package org.apache.samza.table;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
 
 
 /**
@@ -30,8 +29,7 @@ public interface TableProviderFactory {
   /**
    * Construct a table provider based on job configuration
    * @param tableId Id of the table
-   * @param config Job configuration
    * @return the constructed table provider
    */
-  TableProvider getTableProvider(String tableId, Config config);
+  TableProvider getTableProvider(String tableId);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
index bb83e9c..7ad423d 100644
--- a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.table;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.context.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,19 +32,14 @@ abstract public class BaseTableProvider implements TableProvider {
 
   final protected String tableId;
 
-  // Job config
-  final protected Config config;
-
   protected Context context;
 
   /**
    * Construct the table provider using table Id and job configuration
    * @param tableId Id of the table
-   * @param config job configuration
    */
-  public BaseTableProvider(String tableId, Config config) {
+  public BaseTableProvider(String tableId) {
     this.tableId = tableId;
-    this.config = config;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index cae7548..d3ba771 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -94,7 +94,7 @@ public class TableManager {
     TableProviderFactory tableProviderFactory =
         Util.getObj(providerFactoryClassName, TableProviderFactory.class);
     TableCtx ctx = new TableCtx();
-    ctx.tableProvider = tableProviderFactory.getTableProvider(tableId, config);
+    ctx.tableProvider = tableProviderFactory.getTableProvider(tableId);
     tableContexts.put(tableId, ctx);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
index f78347a..d835809 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.ReadableTable;
@@ -32,6 +31,7 @@ import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.BaseTableProvider;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 
 /**
@@ -42,13 +42,15 @@ public class CachingTableProvider extends BaseTableProvider {
   // Store the cache instances created by default
   private final List<ReadWriteTable> defaultCaches = new ArrayList<>();
 
-  public CachingTableProvider(String tableId, Config config) {
-    super(tableId, config);
+  public CachingTableProvider(String tableId) {
+    super(tableId);
   }
 
   @Override
   public Table getTable() {
-    JavaTableConfig tableConfig = new JavaTableConfig(config);
+    Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
+
+    JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
     String realTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.REAL_TABLE_ID);
     ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
 
@@ -58,7 +60,7 @@ public class CachingTableProvider extends BaseTableProvider {
     if (cacheTableId != null) {
       cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId);
     } else {
-      cache = createDefaultCacheTable(realTableId);
+      cache = createDefaultCacheTable(realTableId, tableConfig);
       defaultCaches.add(cache);
     }
 
@@ -74,8 +76,7 @@ public class CachingTableProvider extends BaseTableProvider {
     defaultCaches.forEach(c -> c.close());
   }
 
-  private ReadWriteTable createDefaultCacheTable(String tableId) {
-    JavaTableConfig tableConfig = new JavaTableConfig(config);
+  private ReadWriteTable createDefaultCacheTable(String tableId, JavaTableConfig tableConfig) {
     long readTtlMs = Long.parseLong(tableConfig.getForTable(tableId, CachingTableDescriptor.READ_TTL_MS, "-1"));
     long writeTtlMs = Long.parseLong(tableConfig.getForTable(tableId, CachingTableDescriptor.WRITE_TTL_MS, "-1"));
     long cacheSize = Long.parseLong(tableConfig.getForTable(tableId, CachingTableDescriptor.CACHE_SIZE, "-1"));

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
index f421538..ab76bf2 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.table.caching;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
 
@@ -28,8 +27,8 @@ import org.apache.samza.table.TableProviderFactory;
  */
 public class CachingTableProviderFactory implements TableProviderFactory {
   @Override
-  public TableProvider getTableProvider(String tableId, Config config) {
-    return new CachingTableProvider(tableId, config);
+  public TableProvider getTableProvider(String tableId) {
+    return new CachingTableProvider(tableId);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
index 21c78cc..e45719e 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
@@ -19,10 +19,10 @@
 
 package org.apache.samza.table.caching.guava;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.BaseTableProvider;
@@ -39,13 +39,14 @@ public class GuavaCacheTableProvider extends BaseTableProvider {
 
   private List<GuavaCacheTable> guavaTables = new ArrayList<>();
 
-  public GuavaCacheTableProvider(String tableId, Config config) {
-    super(tableId, config);
+  public GuavaCacheTableProvider(String tableId) {
+    super(tableId);
   }
 
   @Override
   public Table getTable() {
-    JavaTableConfig tableConfig = new JavaTableConfig(config);
+    Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
+    JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
     Cache guavaCache = SerdeUtils.deserialize(GuavaCacheTableDescriptor.GUAVA_CACHE,
         tableConfig.getForTable(tableId, GuavaCacheTableDescriptor.GUAVA_CACHE));
     GuavaCacheTable table = new GuavaCacheTable(tableId, guavaCache);

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
index d5323f5..6cff607 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.table.caching.guava;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
 
@@ -28,7 +27,7 @@ import org.apache.samza.table.TableProviderFactory;
  */
 public class GuavaCacheTableProviderFactory implements TableProviderFactory {
   @Override
-  public TableProvider getTableProvider(String tableId, Config config) {
-    return new GuavaCacheTableProvider(tableId, config);
+  public TableProvider getTableProvider(String tableId) {
+    return new GuavaCacheTableProvider(tableId);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 8f0b2fd..93a0521 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -19,8 +19,9 @@
 
 package org.apache.samza.table.remote;
 
-import org.apache.samza.config.Config;
+import com.google.common.base.Preconditions;
 import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.context.Context;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.retry.RetriableReadFunction;
@@ -44,10 +45,10 @@ import java.util.concurrent.ScheduledExecutorService;
  */
 public class RemoteTableProvider extends BaseTableProvider {
 
-
-  private final boolean readOnly;
   private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
 
+  private boolean readOnly;
+
   /**
    * Map of tableId -> executor service for async table IO and callbacks. The same executors
    * are shared by both read/write operations such that tables of the same tableId all share
@@ -57,17 +58,25 @@ public class RemoteTableProvider extends BaseTableProvider {
   private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
   private static ScheduledExecutorService retryExecutor;
 
-  public RemoteTableProvider(String tableId, Config config) {
-    super(tableId, config);
-    JavaTableConfig tableConfig = new JavaTableConfig(config);
+  public RemoteTableProvider(String tableId) {
+    super(tableId);
+  }
+
+  @Override
+  public void init(Context context) {
+    super.init(context);
+    JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
     this.readOnly = tableConfig.getForTable(tableId, RemoteTableDescriptor.WRITE_FN) == null;
   }
 
   @Override
   public Table getTable() {
+
+    Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
+
     RemoteReadableTable table;
 
-    JavaTableConfig tableConfig = new JavaTableConfig(config);
+    JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
 
     TableReadFunction readFn = getReadFn(tableConfig);
     RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER);

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
index 723288d..809131a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.table.remote;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
 
@@ -28,7 +27,7 @@ import org.apache.samza.table.TableProviderFactory;
  */
 public class RemoteTableProviderFactory implements TableProviderFactory {
   @Override
-  public TableProvider getTableProvider(String tableId, Config config) {
-    return new RemoteTableProvider(tableId, config);
+  public TableProvider getTableProvider(String tableId) {
+    return new RemoteTableProvider(tableId);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index 1bbf3af..a3b1963 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -20,7 +20,6 @@ package org.apache.samza.table;
 
 import junit.framework.Assert;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
@@ -54,7 +53,7 @@ public class TestTableManager {
     static TableProvider tableProvider;
 
     @Override
-    public TableProvider getTableProvider(String tableId, Config config) {
+    public TableProvider getTableProvider(String tableId) {
       table = mock(ReadableTable.class);
       tableProvider = mock(TableProvider.class);
       when(tableProvider.getTable()).thenReturn(table);

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index 36ec46f..bfb329a 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -22,7 +22,6 @@ package org.apache.samza.table.caching;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.Context;
@@ -159,9 +158,6 @@ public class TestCachingTable {
       desc.withWriteAround();
     }
 
-    Config config = new MapConfig(desc.toConfig(new MapConfig()));
-    CachingTableProvider tableProvider = new CachingTableProvider(desc.getTableId(), config);
-
     Context context = new MockContext();
     final ReadWriteTable cacheTable = getMockCache().getLeft();
 
@@ -189,6 +185,10 @@ public class TestCachingTable {
 
     when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
 
+    Map<String, String> tableConfig = desc.toConfig(new MapConfig());
+    when(context.getJobContext().getConfig()).thenReturn(new MapConfig(tableConfig));
+
+    CachingTableProvider tableProvider = new CachingTableProvider(desc.getTableId());
     tableProvider.init(context);
 
     CachingTable cachingTable = (CachingTable) tableProvider.getTable();

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
index 99669bf..486295a 100644
--- a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
@@ -154,7 +154,7 @@ public class TestLocalTableDescriptor {
 
   public static class MockTableProviderFactory implements TableProviderFactory {
     @Override
-    public TableProvider getTableProvider(String tableId, Config config) {
+    public TableProvider getTableProvider(String tableId) {
       return new MockTableProvider();
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
new file mode 100644
index 0000000..d1369d0
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import junit.framework.Assert;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestRemoteReadWriteTable {
+  private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
+
+  public static Context getMockContext() {
+    Context context = new MockContext();
+    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+    doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
+    doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
+    doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
+    doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+    return context;
+  }
+
+  private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
+      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
+    return getTable(tableId, readFn, writeFn, null);
+  }
+
+  private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
+      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
+    RemoteReadableTable<K, V> table;
+
+    TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
+    TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
+    doReturn(true).when(readRateLimiter).isRateLimited();
+    doReturn(true).when(writeRateLimiter).isRateLimited();
+
+    ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
+
+    if (writeFn == null) {
+      table = new RemoteReadableTable<K, V>(tableId, readFn, readRateLimiter, tableExecutor, cbExecutor);
+    } else {
+      table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
+    }
+
+    Context context = getMockContext();
+
+    table.init(context);
+
+    return (T) table;
+  }
+
+  private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
+    String tableId = "testGet-" + sync + error + retry;
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    // Sync is backed by async so needs to mock the async method
+    CompletableFuture<String> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+      if (!retry) {
+        doReturn(future).when(readFn).getAsync(anyString());
+      } else {
+        final int [] times = new int[] {0};
+        doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
+            .when(readFn).getAsync(anyString());
+      }
+    } else {
+      future = CompletableFuture.completedFuture("bar");
+      doReturn(future).when(readFn).getAsync(anyString());
+    }
+    if (retry) {
+      doReturn(true).when(readFn).isRetriable(any());
+      TableRetryPolicy policy = new TableRetryPolicy();
+      readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
+    }
+    RemoteReadableTable<String, String> table = getTable(tableId, readFn, null);
+    Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
+    verify(table.readRateLimiter, times(1)).throttle(anyString());
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    doTestGet(true, false, false);
+  }
+
+  @Test
+  public void testGetAsync() throws Exception {
+    doTestGet(false, false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testGetAsyncError() throws Exception {
+    doTestGet(false, true, false);
+  }
+
+  @Test
+  public void testGetAsyncErrorRetried() throws Exception {
+    doTestGet(false, true, true);
+  }
+
+  @Test
+  public void testGetMultipleTables() {
+    TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
+    TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
+
+    // Sync is backed by async so needs to mock the async method
+    doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
+    doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
+
+    RemoteReadableTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
+    RemoteReadableTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
+
+    CompletableFuture<String> future1 = table1.getAsync("foo1");
+    CompletableFuture<String> future2 = table2.getAsync("foo2");
+
+    CompletableFuture.allOf(future1, future2)
+        .thenAccept(u -> {
+            Assert.assertEquals(future1.join(), "bar1");
+            Assert.assertEquals(future2.join(), "bar1");
+          });
+  }
+
+  private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
+    String tableId = "testPut-" + sync + error + isDelete + retry;
+    TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
+    TableWriteFunction<String, String> writeFn = mockWriteFn;
+    CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
+    CompletableFuture<Void> failureFuture = new CompletableFuture();
+    failureFuture.completeExceptionally(new RuntimeException("Test exception"));
+    if (!error) {
+      if (isDelete) {
+        doReturn(successFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doReturn(successFuture).when(writeFn).putAsync(any(), any());
+      }
+    } else if (!retry) {
+      if (isDelete) {
+        doReturn(failureFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doReturn(failureFuture).when(writeFn).putAsync(any(), any());
+      }
+    } else {
+      doReturn(true).when(writeFn).isRetriable(any());
+      final int [] times = new int[] {0};
+      if (isDelete) {
+        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
+      }
+      writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
+    }
+    RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
+    if (sync) {
+      table.put("foo", isDelete ? null : "bar");
+    } else {
+      table.putAsync("foo", isDelete ? null : "bar").get();
+    }
+    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
+    if (isDelete) {
+      verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
+    } else {
+      verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
+      Assert.assertEquals("bar", valCaptor.getValue());
+    }
+    Assert.assertEquals("foo", keyCaptor.getValue());
+    if (isDelete) {
+      verify(table.writeRateLimiter, times(1)).throttle(anyString());
+    } else {
+      verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
+    }
+  }
+
+  @Test
+  public void testPut() throws Exception {
+    doTestPut(true, false, false, false);
+  }
+
+  @Test
+  public void testPutDelete() throws Exception {
+    doTestPut(true, false, true, false);
+  }
+
+  @Test
+  public void testPutAsync() throws Exception {
+    doTestPut(false, false, false, false);
+  }
+
+  @Test
+  public void testPutAsyncDelete() throws Exception {
+    doTestPut(false, false, true, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testPutAsyncError() throws Exception {
+    doTestPut(false, true, false, false);
+  }
+
+  @Test
+  public void testPutAsyncErrorRetried() throws Exception {
+    doTestPut(false, true, false, true);
+  }
+
+  private void doTestDelete(boolean sync, boolean error) throws Exception {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteReadWriteTable<String, String> table = getTable("testDelete-" + sync + error,
+        mock(TableReadFunction.class), writeFn);
+    CompletableFuture<Void> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(null);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(writeFn).deleteAsync(any());
+    ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
+    if (sync) {
+      table.delete("foo");
+    } else {
+      table.deleteAsync("foo").get();
+    }
+    verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
+    Assert.assertEquals("foo", argCaptor.getValue());
+    verify(table.writeRateLimiter, times(1)).throttle(anyString());
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    doTestDelete(true, false);
+  }
+
+  @Test
+  public void testDeleteAsync() throws Exception {
+    doTestDelete(false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testDeleteAsyncError() throws Exception {
+    doTestDelete(false, true);
+  }
+
+  private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    Map<String, String> res = new HashMap<>();
+    res.put("foo1", "bar1");
+    if (!partial) {
+      res.put("foo2", "bar2");
+    }
+    CompletableFuture<Map<String, String>> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(res);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(readFn).getAllAsync(any());
+    RemoteReadableTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
+    Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
+        : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
+    verify(table.readRateLimiter, times(1)).throttle(anyCollection());
+  }
+
+  @Test
+  public void testGetAll() throws Exception {
+    doTestGetAll(true, false, false);
+  }
+
+  @Test
+  public void testGetAllAsync() throws Exception {
+    doTestGetAll(false, false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testGetAllAsyncError() throws Exception {
+    doTestGetAll(false, true, false);
+  }
+
+  // Partial result is an acceptable scenario
+  @Test
+  public void testGetAllPartialResult() throws Exception {
+    doTestGetAll(false, false, true);
+  }
+
+  public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteReadWriteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
+        mock(TableReadFunction.class), writeFn);
+    CompletableFuture<Void> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(null);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(writeFn).putAllAsync(any());
+    if (hasDelete) {
+      doReturn(future).when(writeFn).deleteAllAsync(any());
+    }
+    List<Entry<String, String>> entries = Arrays.asList(
+        new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
+    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
+    if (sync) {
+      table.putAll(entries);
+    } else {
+      table.putAllAsync(entries).get();
+    }
+    verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
+    if (hasDelete) {
+      ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
+      verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
+      Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
+      Assert.assertEquals(1, argCaptor.getValue().size());
+      Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
+      verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
+    } else {
+      Assert.assertEquals(entries, argCaptor.getValue());
+    }
+    verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
+  }
+
+  @Test
+  public void testPutAll() throws Exception {
+    doTestPutAll(true, false, false);
+  }
+
+  @Test
+  public void testPutAllHasDelete() throws Exception {
+    doTestPutAll(true, false, true);
+  }
+
+  @Test
+  public void testPutAllAsync() throws Exception {
+    doTestPutAll(false, false, false);
+  }
+
+  @Test
+  public void testPutAllAsyncHasDelete() throws Exception {
+    doTestPutAll(false, false, true);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testPutAllAsyncError() throws Exception {
+    doTestPutAll(false, true, false);
+  }
+
+  public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteReadWriteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
+        mock(TableReadFunction.class), writeFn);
+    CompletableFuture<Void> future;
+    if (error) {
+      future = new CompletableFuture();
+      future.completeExceptionally(new RuntimeException("Test exception"));
+    } else {
+      future = CompletableFuture.completedFuture(null);
+    }
+    // Sync is backed by async so needs to mock the async method
+    doReturn(future).when(writeFn).deleteAllAsync(any());
+    List<String> keys = Arrays.asList("foo1", "foo2");
+    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
+    if (sync) {
+      table.deleteAll(keys);
+    } else {
+      table.deleteAllAsync(keys).get();
+    }
+    verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
+    Assert.assertEquals(keys, argCaptor.getValue());
+    verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
+  }
+
+  @Test
+  public void testDeleteAll() throws Exception {
+    doTestDeleteAll(true, false);
+  }
+
+  @Test
+  public void testDeleteAllAsync() throws Exception {
+    doTestDeleteAll(false, false);
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testDeleteAllAsyncError() throws Exception {
+    doTestDeleteAll(false, true);
+  }
+
+  @Test
+  public void testFlush() {
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    RemoteReadWriteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
+    table.flush();
+    verify(writeFn, times(1)).flush();
+  }
+
+  @Test
+  public void testGetWithCallbackExecutor() throws Exception {
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    // Sync is backed by async so needs to mock the async method
+    doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
+    RemoteReadableTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
+        Executors.newSingleThreadExecutor());
+    Thread testThread = Thread.currentThread();
+
+    table.getAsync("foo").thenAccept(result -> {
+        Assert.assertEquals("bar", result);
+        // Must be executed on the executor thread
+        Assert.assertNotSame(testThread, Thread.currentThread());
+      });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
deleted file mode 100644
index 571f87b..0000000
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import junit.framework.Assert;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.retry.RetriableReadFunction;
-import org.apache.samza.table.retry.RetriableWriteFunction;
-import org.apache.samza.table.retry.TableRetryPolicy;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-
-public class TestRemoteTable {
-  private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
-
-  public static Context getMockContext() {
-    Context context = new MockContext();
-    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
-    doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
-    doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
-    doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
-    return context;
-  }
-
-  private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
-      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
-    return getTable(tableId, readFn, writeFn, null);
-  }
-
-  private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
-      TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
-    RemoteReadableTable<K, V> table;
-
-    TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
-    TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
-    doReturn(true).when(readRateLimiter).isRateLimited();
-    doReturn(true).when(writeRateLimiter).isRateLimited();
-
-    ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
-
-    if (writeFn == null) {
-      table = new RemoteReadableTable<K, V>(tableId, readFn, readRateLimiter, tableExecutor, cbExecutor);
-    } else {
-      table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
-    }
-
-    Context context = getMockContext();
-
-    table.init(context);
-
-    return (T) table;
-  }
-
-  private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
-    String tableId = "testGet-" + sync + error + retry;
-    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
-    // Sync is backed by async so needs to mock the async method
-    CompletableFuture<String> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-      if (!retry) {
-        doReturn(future).when(readFn).getAsync(anyString());
-      } else {
-        final int [] times = new int[] {0};
-        doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
-            .when(readFn).getAsync(anyString());
-      }
-    } else {
-      future = CompletableFuture.completedFuture("bar");
-      doReturn(future).when(readFn).getAsync(anyString());
-    }
-    if (retry) {
-      doReturn(true).when(readFn).isRetriable(any());
-      TableRetryPolicy policy = new TableRetryPolicy();
-      readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
-    }
-    RemoteReadableTable<String, String> table = getTable(tableId, readFn, null);
-    Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
-    verify(table.readRateLimiter, times(1)).throttle(anyString());
-  }
-
-  @Test
-  public void testGet() throws Exception {
-    doTestGet(true, false, false);
-  }
-
-  @Test
-  public void testGetAsync() throws Exception {
-    doTestGet(false, false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testGetAsyncError() throws Exception {
-    doTestGet(false, true, false);
-  }
-
-  @Test
-  public void testGetAsyncErrorRetried() throws Exception {
-    doTestGet(false, true, true);
-  }
-
-  @Test
-  public void testGetMultipleTables() {
-    TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
-    TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
-
-    // Sync is backed by async so needs to mock the async method
-    doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
-    doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
-
-    RemoteReadableTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
-    RemoteReadableTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
-
-    CompletableFuture<String> future1 = table1.getAsync("foo1");
-    CompletableFuture<String> future2 = table2.getAsync("foo2");
-
-    CompletableFuture.allOf(future1, future2)
-        .thenAccept(u -> {
-            Assert.assertEquals(future1.join(), "bar1");
-            Assert.assertEquals(future2.join(), "bar1");
-          });
-  }
-
-  private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
-    String tableId = "testPut-" + sync + error + isDelete + retry;
-    TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
-    TableWriteFunction<String, String> writeFn = mockWriteFn;
-    CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
-    CompletableFuture<Void> failureFuture = new CompletableFuture();
-    failureFuture.completeExceptionally(new RuntimeException("Test exception"));
-    if (!error) {
-      if (isDelete) {
-        doReturn(successFuture).when(writeFn).deleteAsync(any());
-      } else {
-        doReturn(successFuture).when(writeFn).putAsync(any(), any());
-      }
-    } else if (!retry) {
-      if (isDelete) {
-        doReturn(failureFuture).when(writeFn).deleteAsync(any());
-      } else {
-        doReturn(failureFuture).when(writeFn).putAsync(any(), any());
-      }
-    } else {
-      doReturn(true).when(writeFn).isRetriable(any());
-      final int [] times = new int[] {0};
-      if (isDelete) {
-        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
-      } else {
-        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
-      }
-      writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
-    }
-    RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
-    if (sync) {
-      table.put("foo", isDelete ? null : "bar");
-    } else {
-      table.putAsync("foo", isDelete ? null : "bar").get();
-    }
-    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
-    if (isDelete) {
-      verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
-    } else {
-      verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
-      Assert.assertEquals("bar", valCaptor.getValue());
-    }
-    Assert.assertEquals("foo", keyCaptor.getValue());
-    if (isDelete) {
-      verify(table.writeRateLimiter, times(1)).throttle(anyString());
-    } else {
-      verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
-    }
-  }
-
-  @Test
-  public void testPut() throws Exception {
-    doTestPut(true, false, false, false);
-  }
-
-  @Test
-  public void testPutDelete() throws Exception {
-    doTestPut(true, false, true, false);
-  }
-
-  @Test
-  public void testPutAsync() throws Exception {
-    doTestPut(false, false, false, false);
-  }
-
-  @Test
-  public void testPutAsyncDelete() throws Exception {
-    doTestPut(false, false, true, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testPutAsyncError() throws Exception {
-    doTestPut(false, true, false, false);
-  }
-
-  @Test
-  public void testPutAsyncErrorRetried() throws Exception {
-    doTestPut(false, true, false, true);
-  }
-
-  private void doTestDelete(boolean sync, boolean error) throws Exception {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testDelete-" + sync + error,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(null);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(writeFn).deleteAsync(any());
-    ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
-    if (sync) {
-      table.delete("foo");
-    } else {
-      table.deleteAsync("foo").get();
-    }
-    verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
-    Assert.assertEquals("foo", argCaptor.getValue());
-    verify(table.writeRateLimiter, times(1)).throttle(anyString());
-  }
-
-  @Test
-  public void testDelete() throws Exception {
-    doTestDelete(true, false);
-  }
-
-  @Test
-  public void testDeleteAsync() throws Exception {
-    doTestDelete(false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testDeleteAsyncError() throws Exception {
-    doTestDelete(false, true);
-  }
-
-  private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
-    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
-    Map<String, String> res = new HashMap<>();
-    res.put("foo1", "bar1");
-    if (!partial) {
-      res.put("foo2", "bar2");
-    }
-    CompletableFuture<Map<String, String>> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(res);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(readFn).getAllAsync(any());
-    RemoteReadableTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
-    Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
-        : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
-    verify(table.readRateLimiter, times(1)).throttle(anyCollection());
-  }
-
-  @Test
-  public void testGetAll() throws Exception {
-    doTestGetAll(true, false, false);
-  }
-
-  @Test
-  public void testGetAllAsync() throws Exception {
-    doTestGetAll(false, false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testGetAllAsyncError() throws Exception {
-    doTestGetAll(false, true, false);
-  }
-
-  // Partial result is an acceptable scenario
-  @Test
-  public void testGetAllPartialResult() throws Exception {
-    doTestGetAll(false, false, true);
-  }
-
-  public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(null);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(writeFn).putAllAsync(any());
-    if (hasDelete) {
-      doReturn(future).when(writeFn).deleteAllAsync(any());
-    }
-    List<Entry<String, String>> entries = Arrays.asList(
-        new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
-    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
-    if (sync) {
-      table.putAll(entries);
-    } else {
-      table.putAllAsync(entries).get();
-    }
-    verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
-    if (hasDelete) {
-      ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
-      verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
-      Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
-      Assert.assertEquals(1, argCaptor.getValue().size());
-      Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
-      verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
-    } else {
-      Assert.assertEquals(entries, argCaptor.getValue());
-    }
-    verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
-  }
-
-  @Test
-  public void testPutAll() throws Exception {
-    doTestPutAll(true, false, false);
-  }
-
-  @Test
-  public void testPutAllHasDelete() throws Exception {
-    doTestPutAll(true, false, true);
-  }
-
-  @Test
-  public void testPutAllAsync() throws Exception {
-    doTestPutAll(false, false, false);
-  }
-
-  @Test
-  public void testPutAllAsyncHasDelete() throws Exception {
-    doTestPutAll(false, false, true);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testPutAllAsyncError() throws Exception {
-    doTestPutAll(false, true, false);
-  }
-
-  public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
-    } else {
-      future = CompletableFuture.completedFuture(null);
-    }
-    // Sync is backed by async so needs to mock the async method
-    doReturn(future).when(writeFn).deleteAllAsync(any());
-    List<String> keys = Arrays.asList("foo1", "foo2");
-    ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
-    if (sync) {
-      table.deleteAll(keys);
-    } else {
-      table.deleteAllAsync(keys).get();
-    }
-    verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
-    Assert.assertEquals(keys, argCaptor.getValue());
-    verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
-  }
-
-  @Test
-  public void testDeleteAll() throws Exception {
-    doTestDeleteAll(true, false);
-  }
-
-  @Test
-  public void testDeleteAllAsync() throws Exception {
-    doTestDeleteAll(false, false);
-  }
-
-  @Test(expected = ExecutionException.class)
-  public void testDeleteAllAsyncError() throws Exception {
-    doTestDeleteAll(false, true);
-  }
-
-  @Test
-  public void testFlush() {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
-    table.flush();
-    verify(writeFn, times(1)).flush();
-  }
-
-  @Test
-  public void testGetWithCallbackExecutor() throws Exception {
-    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
-    // Sync is backed by async so needs to mock the async method
-    doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
-    RemoteReadableTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
-        Executors.newSingleThreadExecutor());
-    Thread testThread = Thread.currentThread();
-
-    table.getAsync("foo").thenAccept(result -> {
-        Assert.assertEquals("bar", result);
-        // Must be executed on the executor thread
-        Assert.assertNotSame(testThread, Thread.currentThread());
-      });
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index c831861..69e95d4 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -20,12 +20,12 @@
 package org.apache.samza.table.remote.descriptors;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
 import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -35,6 +35,7 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.table.remote.RemoteReadWriteTable;
 import org.apache.samza.table.remote.RemoteTableProvider;
 import org.apache.samza.table.remote.TableRateLimiter;
@@ -48,7 +49,6 @@ import org.apache.samza.util.EmbeddedTaggedRateLimiter;
 import org.apache.samza.util.RateLimiter;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -129,15 +129,15 @@ public class TestRemoteTableDescriptor {
     desc.toConfig(new MapConfig());
   }
 
-  private Context createMockContext() {
+  private Context createMockContext(TableDescriptor tableDescriptor) {
     Context context = mock(Context.class);
 
     TaskContextImpl taskContext = mock(TaskContextImpl.class);
     when(context.getTaskContext()).thenReturn(taskContext);
 
     MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    when(metricsRegistry.newTimer(Matchers.anyString(), Matchers.anyString())).thenReturn(mock(Timer.class));
-    when(metricsRegistry.newCounter(Matchers.anyString(), Matchers.anyString())).thenReturn(mock(Counter.class));
+    when(metricsRegistry.newTimer(anyString(), anyString())).thenReturn(mock(Timer.class));
+    when(metricsRegistry.newCounter(anyString(), anyString())).thenReturn(mock(Counter.class));
     when(taskContext.getTaskMetricsRegistry()).thenReturn(metricsRegistry);
 
     TaskName taskName = new TaskName("MyTask");
@@ -157,6 +157,10 @@ public class TestRemoteTableDescriptor {
     when(taskContext.getJobModel()).thenReturn(jobModel);
     when(jobModel.getContainers()).thenReturn(ImmutableMap.of(containerId, containerModel));
 
+    JobContext jobContext = mock(JobContext.class);
+    when(jobContext.getConfig()).thenReturn(new MapConfig(tableDescriptor.toConfig(new MapConfig())));
+    when(context.getJobContext()).thenReturn(jobContext);
+
     return context;
   }
 
@@ -201,9 +205,8 @@ public class TestRemoteTableDescriptor {
       }
     }
 
-    Config config = new MapConfig(desc.toConfig(new MapConfig()));
-    RemoteTableProvider provider = new RemoteTableProvider(desc.getTableId(), config);
-    provider.init(createMockContext());
+    RemoteTableProvider provider = new RemoteTableProvider(desc.getTableId());
+    provider.init(createMockContext(desc));
     Table table = provider.getTable();
     Assert.assertTrue(table instanceof RemoteReadWriteTable);
     RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
index 050ea55..5116cab 100644
--- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
@@ -35,7 +35,7 @@ import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.table.remote.TestRemoteTable;
+import org.apache.samza.table.remote.TestRemoteReadWriteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 import org.junit.Test;
 
@@ -54,7 +54,7 @@ public class TestRetriableTableFunctions {
 
   public TableMetricsUtil getMetricsUtil(String tableId) {
     Table table = mock(Table.class);
-    Context context = TestRemoteTable.getMockContext();
+    Context context = TestRemoteReadWriteTable.getMockContext();
     return new TableMetricsUtil(context, table, tableId);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
index 68590b1..3be61d0 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
@@ -19,7 +19,6 @@
 package org.apache.samza.storage.kv;
 
 import com.google.common.base.Preconditions;
-import org.apache.samza.config.Config;
 import org.apache.samza.context.Context;
 import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.Table;
@@ -36,8 +35,8 @@ public class LocalTableProvider extends BaseTableProvider {
 
   protected KeyValueStore kvStore;
 
-  public LocalTableProvider(String tableId, Config config) {
-    super(tableId, config);
+  public LocalTableProvider(String tableId) {
+    super(tableId);
   }
 
   @Override
@@ -55,6 +54,7 @@ public class LocalTableProvider extends BaseTableProvider {
 
   @Override
   public Table getTable() {
+    Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
     Preconditions.checkNotNull(kvStore, "Store not initialized for table " + tableId);
     ReadableTable table = new LocalReadWriteTable(tableId, kvStore);
     table.init(this.context);

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProviderFactory.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProviderFactory.java
index cf9ffa4..b82ba21 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProviderFactory.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProviderFactory.java
@@ -18,14 +18,13 @@
  */
 package org.apache.samza.storage.kv;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
 
 
 public class LocalTableProviderFactory implements TableProviderFactory {
   @Override
-  public TableProvider getTableProvider(String tableId, Config config) {
-    return new LocalTableProvider(tableId, config);
+  public TableProvider getTableProvider(String tableId) {
+    return new LocalTableProvider(tableId);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
index be80dcc..752b91e 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
@@ -20,7 +20,6 @@
 package org.apache.samza.storage.kv.descriptors;
 
 import junit.framework.Assert;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.TaskContext;
 import org.apache.samza.storage.kv.KeyValueStore;
@@ -55,7 +54,7 @@ public class TestLocalTableProvider {
   }
 
   private TableProvider createTableProvider(String tableId) {
-    return new LocalTableProvider(tableId, new MapConfig()) {
+    return new LocalTableProvider(tableId) {
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f529722f/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index b31bdf5..8218b8b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -38,7 +38,7 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContext;
+import org.apache.samza.context.MockContext;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -72,7 +72,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
 
 
@@ -248,10 +247,8 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
     MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
     doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
     doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
-    Context context = mock(Context.class);
-    TaskContext taskContext = mock(TaskContext.class);
-    when(context.getTaskContext()).thenReturn(taskContext);
-    doReturn(metricsRegistry).when(taskContext).getTaskMetricsRegistry();
+    Context context = new MockContext();
+    doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
     return context;
   }