You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/09 22:58:00 UTC

[1/2] samza git commit: SAMZA-1610: Implementation of remote table provider

Repository: samza
Updated Branches:
  refs/heads/master 1971d596c -> 2be7061d4


http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
index 9c79766..05fa19a 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
@@ -18,14 +18,22 @@
  */
 package org.apache.samza.util;
 
+import java.lang.reflect.Field;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.task.TaskContext;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class TestEmbeddedTaggedRateLimiter {
@@ -35,42 +43,71 @@ public class TestEmbeddedTaggedRateLimiter {
   final static private int TARGET_RATE_RED = 1000;
   final static private int TARGET_RATE_PER_TASK_RED = TARGET_RATE_RED / NUMBER_OF_TASKS;
   final static private int TARGET_RATE_GREEN = 2000;
-  final static private int TARGET_RATE_PER_TASK_GREEN = TARGET_RATE_GREEN / NUMBER_OF_TASKS;
   final static private int INCREMENT = 2;
 
+  final static private int TARGET_RATE = 4000;
+  final static private int TARGET_RATE_PER_TASK = TARGET_RATE / NUMBER_OF_TASKS;
+
   @Test
   @Ignore("Flaky Test: Test fails in travis.")
   public void testAcquire() {
-    RateLimiter rateLimiter = createRateLimiter();
+    RateLimiter rateLimiter = new EmbeddedTaggedRateLimiter(TARGET_RATE);
+    initRateLimiter(rateLimiter);
 
-    Map<String, Integer> tagToCount = new HashMap<>();
-    tagToCount.put("red", 0);
-    tagToCount.put("green", 0);
+    int count = 0;
+    long start = System.currentTimeMillis();
+    while (System.currentTimeMillis() - start < TEST_INTERVAL) {
+      rateLimiter.acquire(INCREMENT);
+      count += INCREMENT;
+    }
 
-    Map<String, Integer> tagToCredits = new HashMap<>();
-    tagToCredits.put("red", INCREMENT);
-    tagToCredits.put("green", INCREMENT);
+    long rate = count * 1000 / TEST_INTERVAL;
+    verifyRate(rate);
+  }
+
+  @Test
+  public void testAcquireWithTimeout() {
+    RateLimiter rateLimiter = new EmbeddedTaggedRateLimiter(TARGET_RATE);
+    initRateLimiter(rateLimiter);
+
+    boolean hasSeenZeros = false;
 
+    int count = 0;
+    int callCount = 0;
     long start = System.currentTimeMillis();
     while (System.currentTimeMillis() - start < TEST_INTERVAL) {
-      rateLimiter.acquire(tagToCredits);
-      tagToCount.put("red", tagToCount.get("red") + INCREMENT);
-      tagToCount.put("green", tagToCount.get("green") + INCREMENT);
+      ++callCount;
+      int availableCredits = rateLimiter.acquire(INCREMENT, 20, MILLISECONDS);
+      if (availableCredits <= 0) {
+        hasSeenZeros = true;
+      } else {
+        count += INCREMENT;
+      }
     }
 
-    {
-      long rate = tagToCount.get("red") * 1000 / TEST_INTERVAL;
-      verifyRate(rate, TARGET_RATE_PER_TASK_RED);
-    } {
-      // Note: due to blocking, green is capped at red's QPS
-      long rate = tagToCount.get("green") * 1000 / TEST_INTERVAL;
-      verifyRate(rate, TARGET_RATE_PER_TASK_RED);
-    }
+    long rate = count * 1000 / TEST_INTERVAL;
+    verifyRate(rate);
+    junit.framework.Assert.assertTrue(Math.abs(callCount - TARGET_RATE_PER_TASK * TEST_INTERVAL / 1000 / INCREMENT) <= 2);
+    junit.framework.Assert.assertFalse(hasSeenZeros);
   }
 
-  @Test
-  public void testTryAcquire() {
+  @Test(expected = IllegalStateException.class)
+  public void testFailsWhenUninitialized() {
+    new EmbeddedTaggedRateLimiter(100).acquire(1);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFailsWhenUsingTags() {
+    RateLimiter rateLimiter = new EmbeddedTaggedRateLimiter(10);
+    initRateLimiter(rateLimiter);
+    Map<String, Integer> tagToCredits = new HashMap<>();
+    tagToCredits.put("red", 1);
+    tagToCredits.put("green", 1);
+    rateLimiter.acquire(tagToCredits);
+  }
 
+  @Test
+  public void testAcquireTagged() {
     RateLimiter rateLimiter = createRateLimiter();
 
     Map<String, Integer> tagToCount = new HashMap<>();
@@ -83,22 +120,23 @@ public class TestEmbeddedTaggedRateLimiter {
 
     long start = System.currentTimeMillis();
     while (System.currentTimeMillis() - start < TEST_INTERVAL) {
-      Map<String, Integer> resultMap = rateLimiter.tryAcquire(tagToCredits);
-      tagToCount.put("red", tagToCount.get("red") + resultMap.get("red"));
-      tagToCount.put("green", tagToCount.get("green") + resultMap.get("green"));
+      rateLimiter.acquire(tagToCredits);
+      tagToCount.put("red", tagToCount.get("red") + INCREMENT);
+      tagToCount.put("green", tagToCount.get("green") + INCREMENT);
     }
 
     {
       long rate = tagToCount.get("red") * 1000 / TEST_INTERVAL;
       verifyRate(rate, TARGET_RATE_PER_TASK_RED);
     } {
+      // Note: due to blocking, green is capped at red's QPS
       long rate = tagToCount.get("green") * 1000 / TEST_INTERVAL;
-      verifyRate(rate, TARGET_RATE_PER_TASK_GREEN);
+      verifyRate(rate, TARGET_RATE_PER_TASK_RED);
     }
   }
 
   @Test
-  public void testAcquireWithTimeout() {
+  public void testAcquireWithTimeoutTagged() {
 
     RateLimiter rateLimiter = createRateLimiter();
 
@@ -128,7 +166,7 @@ public class TestEmbeddedTaggedRateLimiter {
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testFailsWhenUninitialized() {
+  public void testFailsWhenUninitializedTagged() {
     Map<String, Integer> tagToTargetRateMap = new HashMap<>();
     tagToTargetRateMap.put("red", 1000);
     tagToTargetRateMap.put("green", 2000);
@@ -141,14 +179,14 @@ public class TestEmbeddedTaggedRateLimiter {
     tagToCredits.put("red", 1);
     tagToCredits.put("green", 1);
     RateLimiter rateLimiter = new EmbeddedTaggedRateLimiter(tagToCredits);
-    TestEmbeddedRateLimiter.initRateLimiter(rateLimiter);
+    initRateLimiter(rateLimiter);
     rateLimiter.acquire(1);
   }
 
   private void verifyRate(long rate, long targetRate) {
     // As the actual rate would likely not be exactly the same as target rate, the calculation below
-    // verifies the actual rate is within 5% of the target rate per task
-    Assert.assertTrue(Math.abs(rate - targetRate) <= targetRate * 5 / 100);
+    // verifies the actual rate is within 10% of the target rate per task
+    Assert.assertTrue(Math.abs(rate - targetRate) <= targetRate * 10 / 100);
   }
 
   private RateLimiter createRateLimiter() {
@@ -156,8 +194,36 @@ public class TestEmbeddedTaggedRateLimiter {
     tagToTargetRateMap.put("red", TARGET_RATE_RED);
     tagToTargetRateMap.put("green", TARGET_RATE_GREEN);
     RateLimiter rateLimiter = new EmbeddedTaggedRateLimiter(tagToTargetRateMap);
-    TestEmbeddedRateLimiter.initRateLimiter(rateLimiter);
+    initRateLimiter(rateLimiter);
     return rateLimiter;
   }
 
+  private void verifyRate(long rate) {
+    // As the actual rate would likely not be exactly the same as target rate, the calculation below
+    // verifies the actual rate is within 5% of the target rate per task
+    junit.framework.Assert.assertTrue(Math.abs(rate - TARGET_RATE_PER_TASK) <= TARGET_RATE_PER_TASK * 5 / 100);
+  }
+
+  static void initRateLimiter(RateLimiter rateLimiter) {
+    Config config = mock(Config.class);
+    TaskContext taskContext = mock(TaskContext.class);
+    SamzaContainerContext containerContext = mockSamzaContainerContext();
+    when(taskContext.getSamzaContainerContext()).thenReturn(containerContext);
+    rateLimiter.init(config, taskContext);
+  }
+
+  static SamzaContainerContext mockSamzaContainerContext() {
+    try {
+      Collection<String> taskNames = mock(Collection.class);
+      when(taskNames.size()).thenReturn(NUMBER_OF_TASKS);
+      SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
+      Field taskNamesField = SamzaContainerContext.class.getDeclaredField("taskNames");
+      taskNamesField.setAccessible(true);
+      taskNamesField.set(containerContext, taskNames);
+      taskNamesField.setAccessible(false);
+      return containerContext;
+    } catch (Exception ex) {
+      throw new SamzaException(ex);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
index 4af0f1d..b494eba 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
@@ -25,18 +25,26 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
-import org.apache.samza.storage.StorageEngine;
-import org.apache.samza.table.LocalStoreBackedTableProvider;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.Table;
+import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 
 /**
- * Base class for tables backed by Samza stores, see {@link LocalStoreBackedTableProvider}.
+ * Base class for tables backed by Samza local stores. The backing stores are
+ * injected during initialization of the table. Since the lifecycle
+ * of the underlying stores are already managed by Samza container,
+ * the table provider will not manage the lifecycle of the backing
+ * stores.
  */
-abstract public class BaseLocalStoreBackedTableProvider implements LocalStoreBackedTableProvider {
+abstract public class BaseLocalStoreBackedTableProvider implements TableProvider {
 
   protected final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -44,13 +52,28 @@ abstract public class BaseLocalStoreBackedTableProvider implements LocalStoreBac
 
   protected KeyValueStore kvStore;
 
+  protected SamzaContainerContext containerContext;
+
+  protected TaskContext taskContext;
+
   public BaseLocalStoreBackedTableProvider(TableSpec tableSpec) {
     this.tableSpec = tableSpec;
   }
 
   @Override
-  public void init(StorageEngine store) {
-    kvStore = (KeyValueStore) store;
+  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+    this.containerContext = containerContext;
+    this.taskContext = taskContext;
+
+    Preconditions.checkNotNull(this.taskContext, "Must specify task context for local tables.");
+
+    kvStore = (KeyValueStore) taskContext.getStore(tableSpec.getId());
+
+    if (kvStore == null) {
+      throw new SamzaException(String.format(
+          "Backing store for table %s was not injected by SamzaContainer", tableSpec.getId()));
+    }
+
     logger.info("Initialized backing store for table " + tableSpec.getId());
   }
 
@@ -59,17 +82,9 @@ abstract public class BaseLocalStoreBackedTableProvider implements LocalStoreBac
     if (kvStore == null) {
       throw new SamzaException("Store not initialized for table " + tableSpec.getId());
     }
-    return new LocalStoreBackedReadWriteTable(kvStore);
-  }
-
-  @Override
-  public void start() {
-    logger.info("Starting table provider for table " + tableSpec.getId());
-  }
-
-  @Override
-  public void stop() {
-    logger.info("Stopping table provider for table " + tableSpec.getId());
+    ReadableTable table = new LocalStoreBackedReadWriteTable(tableSpec.getId(), kvStore);
+    table.init(containerContext, taskContext);
+    return table;
   }
 
   protected Map<String, String> generateCommonStoreConfig(Map<String, String> config) {
@@ -89,4 +104,9 @@ abstract public class BaseLocalStoreBackedTableProvider implements LocalStoreBac
 
     return storeConfig;
   }
+
+  @Override
+  public void close() {
+    logger.info("Shutting down table provider for table " + tableSpec.getId());
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
index 3149c86..4037f60 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
@@ -36,8 +36,8 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab
    * Constructs an instance of {@link LocalStoreBackedReadWriteTable}
    * @param kvStore the backing store
    */
-  public LocalStoreBackedReadWriteTable(KeyValueStore kvStore) {
-    super(kvStore);
+  public LocalStoreBackedReadWriteTable(String tableId, KeyValueStore kvStore) {
+    super(tableId, kvStore);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
index fead086..5ff58ab 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
@@ -24,6 +24,8 @@ import java.util.stream.Collectors;
 
 import org.apache.samza.table.ReadableTable;
 
+import com.google.common.base.Preconditions;
+
 
 /**
  * A store backed readable table
@@ -34,12 +36,16 @@ import org.apache.samza.table.ReadableTable;
 public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V> {
 
   protected KeyValueStore<K, V> kvStore;
+  protected String tableId;
 
   /**
    * Constructs an instance of {@link LocalStoreBackedReadableTable}
    * @param kvStore the backing store
    */
-  public LocalStoreBackedReadableTable(KeyValueStore<K, V> kvStore) {
+  public LocalStoreBackedReadableTable(String tableId, KeyValueStore<K, V> kvStore) {
+    Preconditions.checkArgument(tableId != null & !tableId.isEmpty() , "invalid tableId");
+    Preconditions.checkNotNull(kvStore, "null KeyValueStore");
+    this.tableId = tableId;
     this.kvStore = kvStore;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
index 9c95637..d30c18f 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
@@ -27,11 +27,13 @@ import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.storage.StorageEngine;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
 import org.junit.Before;
 import org.junit.Test;
 
 import junit.framework.Assert;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -55,7 +57,9 @@ public class TestLocalBaseStoreBackedTableProvider {
   @Test
   public void testInit() {
     StorageEngine store = mock(KeyValueStorageEngine.class);
-    tableProvider.init(store);
+    TaskContext taskContext = mock(TaskContext.class);
+    when(taskContext.getStore(any())).thenReturn(store);
+    tableProvider.init(null, taskContext);
     Assert.assertNotNull(tableProvider.getTable());
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index 8f7eb5d..23fa9e6 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -74,7 +74,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
     Profile[] profiles = TestTableData.generateProfiles(count);
 
     int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig();
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
 
     configs.put("streams.Profile.samza.system", "test");
     configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
@@ -112,7 +112,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
     Profile[] profiles = TestTableData.generateProfiles(count);
 
     int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig();
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
 
     configs.put("streams.PageView.samza.system", "test");
     configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
@@ -170,7 +170,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
     Profile[] profiles = TestTableData.generateProfiles(count);
 
     int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig();
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
 
     configs.put("streams.Profile1.samza.system", "test");
     configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
@@ -239,7 +239,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
     assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView);
   }
 
-  private Map<String, String> getBaseJobConfig() {
+  static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
     Map<String, String> configs = new HashMap<>();
     configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
 
@@ -251,8 +251,8 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
 
     // For intermediate streams
     configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
-    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
-    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
+    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
+    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
     configs.put("systems.kafka.samza.key.serde", "int");
     configs.put("systems.kafka.samza.msg.serde", "json");
     configs.put("systems.kafka.default.stream.replication.factor", "1");
@@ -281,7 +281,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
     }
   }
 
-  private class PageViewToProfileJoinFunction implements StreamTableJoinFunction
+  static class PageViewToProfileJoinFunction implements StreamTableJoinFunction
       <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
     private int count;
     @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
new file mode 100644
index 0000000..a260c3f
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.KV;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.remote.RemoteReadableTable;
+import org.apache.samza.table.remote.RemoteTableDescriptor;
+import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+
+public class TestRemoteTable extends AbstractIntegrationTestHarness {
+  private TableReadFunction<Integer, TestTableData.Profile> getInMemoryReader(TestTableData.Profile[] profiles) {
+    final Map<Integer, TestTableData.Profile> profileMap = Arrays.stream(profiles)
+        .collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
+    TableReadFunction<Integer, TestTableData.Profile> reader =
+        (TableReadFunction<Integer, TestTableData.Profile>) key -> profileMap.getOrDefault(key, null);
+    return reader;
+  }
+
+  static List<TestTableData.EnrichedPageView> writtenRecords = new LinkedList<>();
+
+  static class InMemoryWriteFunction implements TableWriteFunction<Integer, TestTableData.EnrichedPageView> {
+    private transient List<TestTableData.EnrichedPageView> records;
+
+    // Verify serializable functionality
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+
+      // Write to the global list for verification
+      records = writtenRecords;
+    }
+
+    @Override
+    public void put(Integer key, TestTableData.EnrichedPageView record) {
+      records.add(record);
+    }
+
+    @Override
+    public void delete(Integer key) {
+      records.remove(key);
+    }
+
+    @Override
+    public void deleteAll(Collection<Integer> keys) {
+      records.removeAll(keys);
+    }
+  }
+
+  @Test
+  public void testStreamTableJoinRemoteTable() throws Exception {
+    List<TestTableData.PageView> received = new LinkedList<>();
+    final InMemoryWriteFunction writer = new InMemoryWriteFunction();
+
+    int count = 10;
+    TestTableData.PageView[] pageViews = TestTableData.generatePageViews(count);
+    TestTableData.Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+
+    final RateLimiter readRateLimiter = mock(RateLimiter.class);
+    final RateLimiter writeRateLimiter = mock(RateLimiter.class);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
+    final StreamApplication app = (streamGraph, cfg) -> {
+      RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
+      inputTableDesc
+          .withReadFunction(getInMemoryReader(profiles))
+          .withRateLimiter(readRateLimiter, null, null);
+
+      RemoteTableDescriptor<Integer, TestTableData.EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
+      outputTableDesc
+          .withReadFunction(key -> null) // dummy reader
+          .withWriteFunction(writer)
+          .withRateLimiter(writeRateLimiter, null, null);
+
+      Table<KV<Integer, TestTableData.Profile>> inputTable = streamGraph.getTable(inputTableDesc);
+      Table<KV<Integer, TestTableData.EnrichedPageView>> outputTable = streamGraph.getTable(outputTableDesc);
+
+      streamGraph.getInputStream("PageView", new NoOpSerde<TestTableData.PageView>())
+          .map(pv -> {
+              received.add(pv);
+              return new KV<Integer, TestTableData.PageView>(pv.getMemberId(), pv);
+            })
+          .join(inputTable, new TestLocalTable.PageViewToProfileJoinFunction())
+          .map(m -> new KV(m.getMemberId(), m))
+          .sendTo(outputTable);
+    };
+
+    runner.run(app);
+    runner.waitForFinish();
+
+    int numExpected = count * partitionCount;
+    Assert.assertEquals(numExpected, received.size());
+    Assert.assertEquals(numExpected, writtenRecords.size());
+    Assert.assertTrue(writtenRecords.get(0) instanceof TestTableData.EnrichedPageView);
+  }
+
+  private TaskContext createMockTaskContext() {
+    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+    doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
+    doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
+    TaskContext context = mock(TaskContext.class);
+    doReturn(metricsRegistry).when(context).getMetricsRegistry();
+    return context;
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testCatchReaderException() {
+    TableReadFunction<String, ?> reader = mock(TableReadFunction.class);
+    doThrow(new RuntimeException("Expected test exception")).when(reader).get(anyString());
+    RemoteReadableTable<String, ?> table = new RemoteReadableTable<>("table1", reader, null, null);
+    table.init(mock(SamzaContainerContext.class), createMockTaskContext());
+    table.get("abc");
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testCatchWriterException() {
+    TableReadFunction<String, String> reader = mock(TableReadFunction.class);
+    TableWriteFunction<String, String> writer = mock(TableWriteFunction.class);
+    doThrow(new RuntimeException("Expected test exception")).when(writer).put(anyString(), any());
+    RemoteReadWriteTable<String, String> table = new RemoteReadWriteTable<>("table1", reader, writer, null, null, null);
+    table.init(mock(SamzaContainerContext.class), createMockTaskContext());
+    table.put("abc", "efg");
+  }
+}


[2/2] samza git commit: SAMZA-1610: Implementation of remote table provider

Posted by ja...@apache.org.
SAMZA-1610: Implementation of remote table provider

Please see commit messages for detailed descriptions.

Author: Peng Du <pd...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>, Wei Song <ws...@linkedin.com>

Closes #432 from pdu-mn1/remote-table-0222


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2be7061d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2be7061d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2be7061d

Branch: refs/heads/master
Commit: 2be7061d4a6ef6345b6f0566cdfeb9bbf279686f
Parents: 1971d59
Author: Peng Du <pd...@linkedin.com>
Authored: Fri Mar 9 14:57:56 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Mar 9 14:57:56 2018 -0800

----------------------------------------------------------------------
 .../table/LocalStoreBackedTableProvider.java    |  37 ---
 .../org/apache/samza/table/ReadableTable.java   |  11 +-
 .../org/apache/samza/table/TableProvider.java   |  18 +-
 .../java/org/apache/samza/util/RateLimiter.java |  20 +-
 .../org/apache/samza/table/TableManager.java    |  50 ++--
 .../samza/table/remote/CreditFunction.java      |  36 +++
 .../table/remote/RemoteReadWriteTable.java      | 178 ++++++++++++++
 .../samza/table/remote/RemoteReadableTable.java | 181 ++++++++++++++
 .../table/remote/RemoteTableDescriptor.java     | 214 ++++++++++++++++
 .../samza/table/remote/RemoteTableProvider.java | 154 ++++++++++++
 .../remote/RemoteTableProviderFactory.java      |  38 +++
 .../samza/table/remote/TableReadFunction.java   |  66 +++++
 .../samza/table/remote/TableWriteFunction.java  |  81 ++++++
 .../apache/samza/util/EmbeddedRateLimiter.java  |  98 --------
 .../samza/util/EmbeddedTaggedRateLimiter.java   |  48 ++--
 .../apache/samza/container/SamzaContainer.scala |   3 +-
 .../apache/samza/container/TaskInstance.scala   |   4 +-
 .../apache/samza/table/TestTableManager.java    |  14 +-
 .../table/remote/TestRemoteTableDescriptor.java | 244 +++++++++++++++++++
 .../samza/util/TestEmbeddedRateLimiter.java     | 155 ------------
 .../util/TestEmbeddedTaggedRateLimiter.java     | 128 +++++++---
 .../kv/BaseLocalStoreBackedTableProvider.java   |  54 ++--
 .../kv/LocalStoreBackedReadWriteTable.java      |   4 +-
 .../kv/LocalStoreBackedReadableTable.java       |   8 +-
 .../TestLocalBaseStoreBackedTableProvider.java  |   6 +-
 .../apache/samza/test/table/TestLocalTable.java |  14 +-
 .../samza/test/table/TestRemoteTable.java       | 180 ++++++++++++++
 27 files changed, 1605 insertions(+), 439 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java b/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
deleted file mode 100644
index 21630ab..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import org.apache.samza.storage.StorageEngine;
-
-
-/**
- * Interface for tables backed by Samza local stores. The backing stores are
- * injected during initialization of the table. Since the lifecycle
- * of the underlying stores are already managed by Samza container,
- * the table provider will not manage the lifecycle of the backing
- * stores.
- */
-public interface LocalStoreBackedTableProvider extends TableProvider {
-  /**
-   * Initializes the table provider with the backing store
-   * @param store the backing store
-   */
-  void init(StorageEngine store);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
index 5ad6e0f..15b6115 100644
--- a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
+++ b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
@@ -22,7 +22,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.operators.KV;
+import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -34,6 +36,14 @@ import org.apache.samza.operators.KV;
  */
 @InterfaceStability.Unstable
 public interface ReadableTable<K, V> extends Table<KV<K, V>> {
+  /**
+   * Initializes the table during container initialization.
+   * Guaranteed to be invoked as the first operation on the table.
+   * @param containerContext Samza container context
+   * @param taskContext nullable for global table
+   */
+  default void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+  }
 
   /**
    * Gets the value associated with the specified {@code key}.
@@ -57,5 +67,4 @@ public interface ReadableTable<K, V> extends Table<KV<K, V>> {
    * Close the table and release any resources acquired
    */
   void close();
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
index 54c6f5d..bbbe38a 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -21,6 +21,8 @@ package org.apache.samza.table;
 import java.util.Map;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -30,6 +32,13 @@ import org.apache.samza.annotation.InterfaceStability;
 @InterfaceStability.Unstable
 public interface TableProvider {
   /**
+   * Initialize TableProvider with container and task context
+   * @param containerContext Samza container context
+   * @param taskContext nullable for global table
+   */
+  void init(SamzaContainerContext containerContext, TaskContext taskContext);
+
+  /**
    * Get an instance of the table for read/write operations
    * @return the underlying table
    */
@@ -46,12 +55,7 @@ public interface TableProvider {
   Map<String, String> generateConfig(Map<String, String> config);
 
   /**
-   * Start the underlying table
-   */
-  void start();
-
-  /**
-   * Stop the underlying table
+   * Shutdown the underlying table
    */
-  void stop();
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java b/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
index 75818dd..ad40d35 100644
--- a/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
+++ b/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
@@ -20,6 +20,7 @@ package org.apache.samza.util;
 
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.samza.annotation.InterfaceStability;
@@ -43,7 +44,6 @@ import org.apache.samza.task.TaskContext;
  * <ul>
  *   <li>Block indefinitely until requested credits become available</li>
  *   <li>Block for a provided amount of time, then return available credits</li>
- *   <li>Non-blocking, returns immediately available credits</li>
  * </ul>
  *
  */
@@ -80,15 +80,6 @@ public interface RateLimiter extends Serializable {
   int acquire(int numberOfCredit, long timeout, TimeUnit unit);
 
   /**
-   * Attempt to acquire the provided number of credits, returns immediately number of
-   * credits acquired.
-   *
-   * @param numberOfCredit requested number of credits
-   * @return number of credits acquired
-   */
-  int tryAcquire(int numberOfCredit);
-
-  /**
    * Attempt to acquire the provided number of credits for a number of tags, blocks indefinitely
    * until all requested credits become available
    *
@@ -110,11 +101,8 @@ public interface RateLimiter extends Serializable {
   Map<String, Integer> acquire(Map<String, Integer> tagToCreditMap, long timeout, TimeUnit unit);
 
   /**
-   * Attempt to acquire the provided number of credits for a number of tags, returns immediately number of
-   * credits acquired.
-   *
-   * @param tagToCreditMap a map of requested number of credits keyed by tag
-   * @return a map of number of credits acquired keyed by tag
+   * Get the entire set of tags for which we have configured credits for rate limiting.
+   * @return set of supported tags
    */
-  Map<String, Integer> tryAcquire(Map<String, Integer> tagToCreditMap);
+  Set<String> getSupportedTags();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index c3555f3..bada304 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -24,13 +24,17 @@ import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
+
 /**
  * A {@link TableManager} manages tables within a Samza task. For each table, it maintains
  * the {@link TableSpec} and the {@link TableProvider}. It is used at execution for
@@ -61,15 +65,14 @@ public class TableManager {
   // tableId -> TableCtx
   private final Map<String, TableCtx> tables = new HashMap<>();
 
-  private boolean localTablesInitialized;
+  private boolean initialized;
 
   /**
    * Construct a table manager instance
-   * @param config the job configuration
+   * @param config job configuration
    * @param serdes Serde instances for tables
    */
   public TableManager(Config config, Map<String, Serde<Object>> serdes) {
-
     new JavaTableConfig(config).getTableIds().forEach(tableId -> {
 
         // Construct the table provider
@@ -91,23 +94,14 @@ public class TableManager {
   }
 
   /**
-   * Initialize all local table
-   * @param stores stores created locally
+   * Initialize table providers with container and task contexts
+   * @param containerContext context for the Samza container
+   * @param taskContext context for the current task, nullable for global tables
    */
-  public void initLocalTables(Map<String, StorageEngine> stores) {
-    tables.values().forEach(ctx -> {
-        if (ctx.tableProvider instanceof LocalStoreBackedTableProvider) {
-          StorageEngine store = stores.get(ctx.tableSpec.getId());
-          if (store == null) {
-            throw new SamzaException(String.format(
-                "Backing store for table %s was not injected by SamzaContainer",
-                ctx.tableSpec.getId()));
-          }
-          ((LocalStoreBackedTableProvider) ctx.tableProvider).init(store);
-        }
-      });
-
-    localTablesInitialized = true;
+  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+    Preconditions.checkNotNull(containerContext, "null container context.");
+    tables.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
+    initialized = true;
   }
 
   /**
@@ -126,17 +120,10 @@ public class TableManager {
   }
 
   /**
-   * Start the table manager, internally it starts all tables
-   */
-  public void start() {
-    tables.values().forEach(ctx -> ctx.tableProvider.start());
-  }
-
-  /**
    * Shutdown the table manager, internally it shuts down all tables
    */
-  public void shutdown() {
-    tables.values().forEach(ctx -> ctx.tableProvider.stop());
+  public void close() {
+    tables.values().forEach(ctx -> ctx.tableProvider.close());
   }
 
   /**
@@ -145,9 +132,10 @@ public class TableManager {
    * @return table instance
    */
   public Table getTable(String tableId) {
-    if (!localTablesInitialized) {
-      throw new IllegalStateException("Local tables in TableManager not initialized.");
+    if (!initialized) {
+      throw new IllegalStateException("TableManager has not been initialized.");
     }
+    Preconditions.checkArgument(tables.containsKey(tableId), "Unknown tableId=" + tableId);
     return tables.get(tableId).tableProvider.getTable();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/CreditFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/CreditFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/CreditFunction.java
new file mode 100644
index 0000000..0d30098
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/CreditFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+import org.apache.samza.operators.KV;
+
+
+/**
+ * Function interface for providing rate limiting credits for each table record.
+ * This interface allows callers to pass in lambda expressions which are otherwise
+ * non-serializable as-is.
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public interface CreditFunction<K, V> extends Function<KV<K, V>, Integer>, Serializable {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
new file mode 100644
index 0000000..a47e349
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.List;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.RateLimiter;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG;
+
+
+/**
+ * Remote store backed read writable table
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> {
+  protected final TableWriteFunction<K, V> writeFn;
+  protected final CreditFunction<K, V> writeCreditFn;
+  protected final boolean rateLimitWrites;
+
+  protected Timer putNs;
+  protected Timer deleteNs;
+  protected Timer flushNs;
+  protected Timer putThrottleNs; // use single timer for all write operations
+  protected Counter numPuts;
+  protected Counter numDeletes;
+  protected Counter numFlushes;
+
+  public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
+      RateLimiter ratelimiter, CreditFunction<K, V> readCreditFn, CreditFunction<K, V> writeCreditFn) {
+    super(tableId, readFn, ratelimiter, readCreditFn);
+    Preconditions.checkNotNull(writeFn, "null write function");
+    this.writeFn = writeFn;
+    this.writeCreditFn = writeCreditFn;
+    this.rateLimitWrites = rateLimiter != null && rateLimiter.getSupportedTags().contains(RL_WRITE_TAG);
+    logger.info("Rate limiting is {} for remote write operations", rateLimitWrites ? "enabled" : "disabled");
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+    super.init(containerContext, taskContext);
+    putNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-put-ns");
+    putThrottleNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-put-throttle-ns");
+    deleteNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-delete-ns");
+    flushNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-flush-ns");
+    numPuts = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-puts");
+    numDeletes = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-deletes");
+    numFlushes = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-flushes");
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void put(K key, V value) {
+    try {
+      numPuts.inc();
+      if (rateLimitWrites) {
+        throttle(key, value, RL_WRITE_TAG, writeCreditFn, putThrottleNs);
+      }
+      long startNs = System.nanoTime();
+      writeFn.put(key, value);
+      putNs.update(System.nanoTime() - startNs);
+    } catch (Exception e) {
+      String errMsg = String.format("Failed to put a record, key=%s, value=%s", key, value);
+      logger.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void putAll(List<Entry<K, V>> entries) {
+    try {
+      writeFn.putAll(entries);
+    } catch (Exception e) {
+      String errMsg = String.format("Failed to put records: %s", entries);
+      logger.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void delete(K key) {
+    try {
+      numDeletes.inc();
+      if (rateLimitWrites) {
+        throttle(key, null, RL_WRITE_TAG, writeCreditFn, putThrottleNs);
+      }
+      long startNs = System.nanoTime();
+      writeFn.delete(key);
+      deleteNs.update(System.nanoTime() - startNs);
+    } catch (Exception e) {
+      String errMsg = String.format("Failed to delete a record, key=%s", key);
+      logger.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deleteAll(List<K> keys) {
+    try {
+      writeFn.deleteAll(keys);
+    } catch (Exception e) {
+      String errMsg = String.format("Failed to delete records, keys=%s", keys);
+      logger.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void flush() {
+    try {
+      numFlushes.inc();
+      if (rateLimitWrites) {
+        throttle(null, null, RL_WRITE_TAG, writeCreditFn, putThrottleNs);
+      }
+      long startNs = System.nanoTime();
+      writeFn.flush();
+      flushNs.update(System.nanoTime() - startNs);
+    } catch (Exception e) {
+      String errMsg = "Failed to flush remote store";
+      logger.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() {
+    super.close();
+    writeFn.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
new file mode 100644
index 0000000..ca8e96b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.KV;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
+
+
+/**
+ * A Samza {@link org.apache.samza.table.Table} backed by a remote data-store or service.
+ * <p>
+ * Many stream-processing applications require to look-up data from remote data sources eg: databases,
+ * web-services, RPC systems to process messages in the stream. Such access to adjunct datasets can be
+ * naturally modeled as a join between the incoming stream and a {@link RemoteReadableTable}.
+ * <p>
+ * Example use-cases include:
+ * <ul>
+ *  <li> Augmenting a stream of "page-views" with information from a database of user-profiles; </li>
+ *  <li> Scoring page views with impressions services. </li>
+ *  <li> A notifications-system that sends out emails may require a query to an external database to process its message. </li>
+ * </ul>
+ * <p>
+ * A {@link RemoteReadableTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction}
+ * which encapsulate the functionality of reading and writing data to the remote service. These provide a
+ * pluggable means to specify I/O operations on the table. While the base implementation merely delegates to
+ * these reader and writer functions, sub-classes of {@link RemoteReadableTable} may provide rich functionality like
+ * caching or throttling on top of them.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
+  protected final String tableId;
+  protected final Logger logger;
+  protected final TableReadFunction<K, V> readFn;
+  protected final String groupName;
+  protected final RateLimiter rateLimiter;
+  protected final CreditFunction<K, V> readCreditFn;
+  protected final boolean rateLimitReads;
+
+  protected Timer getNs;
+  protected Timer getThrottleNs;
+  protected Counter numGets;
+
+  /**
+   * Construct a RemoteReadableTable instance
+   * @param tableId table id
+   * @param readFn {@link TableReadFunction} for read operations
+   * @param rateLimiter optional {@link RateLimiter} for throttling reads
+   * @param readCreditFn function returning a credit to be charged for rate limiting per record
+   */
+  public RemoteReadableTable(String tableId, TableReadFunction<K, V> readFn, RateLimiter rateLimiter,
+      CreditFunction<K, V> readCreditFn) {
+    Preconditions.checkArgument(tableId != null && !tableId.isEmpty(), "invalid table id");
+    Preconditions.checkNotNull(readFn, "null read function");
+    this.tableId = tableId;
+    this.readFn = readFn;
+    this.rateLimiter = rateLimiter;
+    this.readCreditFn = readCreditFn;
+    this.groupName = getClass().getSimpleName();
+    this.logger = LoggerFactory.getLogger(groupName + tableId);
+    this.rateLimitReads = rateLimiter != null && rateLimiter.getSupportedTags().contains(RL_READ_TAG);
+    logger.info("Rate limiting is {} for remote read operations", rateLimitReads ? "enabled" : "disabled");
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+    getNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-get-ns");
+    getThrottleNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-get-throttle-ns");
+    numGets = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-gets");
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public V get(K key) {
+    try {
+      numGets.inc();
+      if (rateLimitReads) {
+        throttle(key, null, RL_READ_TAG, readCreditFn, getThrottleNs);
+      }
+      long startNs = System.nanoTime();
+      V result = readFn.get(key);
+      getNs.update(System.nanoTime() - startNs);
+      return result;
+    } catch (Exception e) {
+      String errMsg = String.format("Failed to get a record, key=%s", key);
+      logger.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Map<K, V> getAll(List<K> keys) {
+    Map<K, V> result;
+    try {
+      result = readFn.getAll(keys);
+    } catch (Exception e) {
+      String errMsg = "Failed to get some records";
+      logger.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+
+    if (result == null) {
+      String errMsg = String.format("Received null records, keys=%s", keys);
+      logger.error(errMsg);
+      throw new SamzaException(errMsg);
+    }
+
+    if (result.size() < keys.size()) {
+      String errMsg = String.format("Received insufficient number of records (%d), keys=%s", result.size(), keys);
+      logger.error(errMsg);
+      throw new SamzaException(errMsg);
+    }
+
+    return result;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() {
+    readFn.close();
+  }
+
+  /**
+   * Throttle requests given a table record (key, value) with rate limiter and credit function
+   * @param key key of the table record (nullable)
+   * @param value value of the table record (nullable)
+   * @param tag tag for rate limiter
+   * @param creditFn mapper function from KV to credits to be charged
+   * @param timer timer metric to track throttling delays
+   */
+  protected void throttle(K key, V value, String tag, CreditFunction<K, V> creditFn, Timer timer) {
+    long startNs = System.nanoTime();
+    int credits = (creditFn == null) ? 1 : creditFn.apply(KV.of(key, value));
+    rateLimiter.acquire(Collections.singletonMap(tag, credits));
+    timer.update(System.nanoTime() - startNs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
new file mode 100644
index 0000000..7bc369d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.util.EmbeddedTaggedRateLimiter;
+import org.apache.samza.util.RateLimiter;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Table descriptor for remote store backed tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, RemoteTableDescriptor<K, V>> {
+  /**
+   * Tag to be used for provision credits for rate limiting read operations from the remote table.
+   * Caller must pre-populate the credits with this tag when specifying a custom rate limiter instance
+   * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, CreditFunction, CreditFunction)}
+   */
+  public static final String RL_READ_TAG = "readTag";
+
+  /**
+   * Tag to be used for provision credits for rate limiting write operations into the remote table.
+   * Caller can optionally populate the credits with this tag when specifying a custom rate limiter instance
+   * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, CreditFunction, CreditFunction)}
+   * and it needs the write functionality.
+   */
+  public static final String RL_WRITE_TAG = "writeTag";
+
+  // Input support for a specific remote store (required)
+  private TableReadFunction<K, V> readFn;
+
+  // Output support for a specific remote store (optional)
+  private TableWriteFunction<K, V> writeFn;
+
+  // Rate limiter for client-side throttling;
+  // can either be constructed indirectly from rates or overridden by withRateLimiter()
+  private RateLimiter rateLimiter;
+
+  // Rates for constructing the default rate limiter when they are non-zero
+  private Map<String, Integer> tagCreditsMap = new HashMap<>();
+
+  private CreditFunction<K, V> readCreditFn;
+  private CreditFunction<K, V> writeCreditFn;
+
+  /**
+   * Construct a table descriptor instance
+   * @param tableId Id of the table
+   */
+  public RemoteTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  @Override
+  public TableSpec getTableSpec() {
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    // Serialize and store reader/writer functions
+    tableSpecConfig.put(RemoteTableProvider.READ_FN, serializeObject("read function", readFn));
+
+    if (writeFn != null) {
+      tableSpecConfig.put(RemoteTableProvider.WRITE_FN, serializeObject("write function", writeFn));
+    }
+
+    // Serialize the rate limiter if specified
+    if (!tagCreditsMap.isEmpty()) {
+      rateLimiter = new EmbeddedTaggedRateLimiter(tagCreditsMap);
+    }
+
+    if (rateLimiter != null) {
+      tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, serializeObject("rate limiter", rateLimiter));
+    }
+
+    // Serialize the readCredit and writeCredit functions
+    if (readCreditFn != null) {
+      tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, serializeObject(
+          "read credit function", readCreditFn));
+    }
+
+    if (writeCreditFn != null) {
+      tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, serializeObject(
+          "write credit function", writeCreditFn));
+    }
+
+    return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig);
+  }
+
+  /**
+   * Use specified TableReadFunction with remote table.
+   * @param readFn read function instance
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn) {
+    Preconditions.checkNotNull(readFn, "null read function");
+    this.readFn = readFn;
+    return this;
+  }
+
+  /**
+   * Use specified TableWriteFunction with remote table.
+   * @param writeFn write function instance
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    this.writeFn = writeFn;
+    return this;
+  }
+
+  /**
+   * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
+   * of credits to be charged from the rate limiter for table read and write operations.
+   * This is an advanced API that provides greater flexibility to throttle each record in the table
+   * with different number of credits. For most common use-cases eg: limit the number of read/write
+   * operations, please instead use the {@link RemoteTableDescriptor#withReadRateLimit(int)} and
+   * {@link RemoteTableDescriptor#withWriteRateLimit(int)}.
+   *
+   * @param rateLimiter rate limiter instance to be used for throttling
+   * @param readCreditFn credit function for rate limiting read operations
+   * @param writeCreditFn credit function for rate limiting write operations
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withRateLimiter(RateLimiter rateLimiter, CreditFunction<K, V> readCreditFn,
+      CreditFunction<K, V> writeCreditFn) {
+    Preconditions.checkNotNull(rateLimiter, "null read rate limiter");
+    this.rateLimiter = rateLimiter;
+    this.readCreditFn = readCreditFn;
+    this.writeCreditFn = writeCreditFn;
+    return this;
+  }
+
+  /**
+   * Specify the rate limit for table read operations. If the read rate limit is set with this method
+   * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, CreditFunction, CreditFunction)}
+   * and vice versa.
+   * @param creditsPerSec rate limit for read operations; must be positive
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withReadRateLimit(int creditsPerSec) {
+    Preconditions.checkArgument(creditsPerSec > 0, "Max read rate must be a positive number.");
+    tagCreditsMap.put(RL_READ_TAG, creditsPerSec);
+    return this;
+  }
+
+  /**
+   * Specify the rate limit for table write operations. If the write rate limit is set with this method
+   * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, CreditFunction, CreditFunction)}
+   * and vice versa.
+   * @param creditsPerSec rate limit for write operations; must be positive
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withWriteRateLimit(int creditsPerSec) {
+    Preconditions.checkArgument(creditsPerSec > 0, "Max write rate must be a positive number.");
+    tagCreditsMap.put(RL_WRITE_TAG, creditsPerSec);
+    return this;
+  }
+
+  /**
+   * Helper method to serialize Java objects as Base64 strings
+   * @param name name of the object (for error reporting)
+   * @param object object to be serialized
+   * @return Base64 representation of the object
+   */
+  private <T> String serializeObject(String name, T object) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+      oos.writeObject(object);
+      return Base64.getEncoder().encodeToString(baos.toByteArray());
+    } catch (IOException e) {
+      throw new SamzaException("Failed to serialize " + name, e);
+    }
+  }
+
+  @Override
+  protected void validate() {
+    super.validate();
+    Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
+    Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
+        "Only one of rateLimiter instance or read/write limits can be specified");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
new file mode 100644
index 0000000..8b9001a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provide for remote table instances
+ */
+public class RemoteTableProvider implements TableProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteTableProvider.class);
+
+  static final String READ_FN = "io.readFn";
+  static final String WRITE_FN = "io.writeFn";
+  static final String RATE_LIMITER = "io.ratelimiter";
+  static final String READ_CREDIT_FN = "io.readCreditFn";
+  static final String WRITE_CREDIT_FN = "io.writeCreditFn";
+
+  private final TableSpec tableSpec;
+  private final boolean readOnly;
+  private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
+  private SamzaContainerContext containerContext;
+  private TaskContext taskContext;
+
+  public RemoteTableProvider(TableSpec tableSpec) {
+    this.tableSpec = tableSpec;
+    readOnly = !tableSpec.getConfig().containsKey(WRITE_FN);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+    this.containerContext = containerContext;
+    this.taskContext = taskContext;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Table getTable() {
+    RemoteReadableTable table;
+    TableReadFunction<?, ?> readFn = getReadFn();
+    RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
+    if (rateLimiter != null) {
+      rateLimiter.init(containerContext.config, taskContext);
+    }
+    CreditFunction<?, ?> readCreditFn = deserializeObject(READ_CREDIT_FN);
+    if (readOnly) {
+      table = new RemoteReadableTable(tableSpec.getId(), readFn, rateLimiter, readCreditFn);
+    } else {
+      CreditFunction<?, ?> writeCreditFn = deserializeObject(WRITE_CREDIT_FN);
+      table = new RemoteReadWriteTable(tableSpec.getId(), readFn, getWriteFn(), rateLimiter, readCreditFn, writeCreditFn);
+    }
+    table.init(containerContext, taskContext);
+    tables.add(table);
+    return table;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Map<String, String> generateConfig(Map<String, String> config) {
+    Map<String, String> tableConfig = new HashMap<>();
+
+    // Insert table_id prefix to config entires
+    tableSpec.getConfig().forEach((k, v) -> {
+        String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
+        tableConfig.put(realKey, v);
+      });
+
+    LOG.info("Generated configuration for table " + tableSpec.getId());
+
+    return tableConfig;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() {
+    tables.forEach(t -> t.close());
+  }
+
+  private <T> T deserializeObject(String key) {
+    String entry = tableSpec.getConfig().getOrDefault(key, "");
+    if (entry.isEmpty()) {
+      return null;
+    }
+
+    try {
+      byte [] bytes = Base64.getDecoder().decode(entry);
+      return (T) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
+    } catch (Exception e) {
+      String errMsg = "Failed to deserialize " + key;
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  private TableReadFunction<?, ?> getReadFn() {
+    TableReadFunction<?, ?> readFn = deserializeObject(READ_FN);
+    if (readFn != null) {
+      readFn.init(containerContext.config, taskContext);
+    }
+    return readFn;
+  }
+
+  private TableWriteFunction<?, ?> getWriteFn() {
+    TableWriteFunction<?, ?> writeFn = deserializeObject(WRITE_FN);
+    if (writeFn != null) {
+      writeFn.init(containerContext.config, taskContext);
+    }
+    return writeFn;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
new file mode 100644
index 0000000..0eb88fd
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory class for a remote table provider
+ */
+public class RemoteTableProviderFactory implements TableProviderFactory {
+  @Override
+  public TableProvider getTableProvider(TableSpec tableSpec) {
+    Preconditions.checkNotNull(tableSpec, "null table spec");
+    return new RemoteTableProvider(tableSpec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
new file mode 100644
index 0000000..dbd386c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.InitableFunction;
+
+
+/**
+ * A function object to be used with a {@link RemoteReadableTable} implementation. It encapsulates the functionality
+ * of reading table record(s) for a provided set of key(s).
+ *
+ * <p> Instances of {@link TableReadFunction} are meant to be serializable. ie. any non-serializable state
+ * (eg: network sockets) should be marked as transient and recreated inside readObject().
+ *
+ * <p> Implementations are expected to be thread-safe.
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+@InterfaceStability.Unstable
+public interface TableReadFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+  /**
+   * Fetch single table record for a specified {@code key}. This method must be thread-safe.
+   * @param key key for the table record
+   * @return table record for the specified {@code key}
+   */
+  V get(K key);
+
+  /**
+   * Fetch the table {@code records} for specified {@code keys}. This method must be thread-safe.
+   * @param keys keys for the table records
+   * @return all records for the specified keys if succeeded; depending on the implementation
+   * of {@link TableReadFunction#get(Object)} it either returns records for a subset of the
+   * keys or throws exception when there is any failure.
+   */
+  default Map<K, V> getAll(Collection<K> keys) {
+    Map<K, V> records = new HashMap<>();
+    keys.forEach(k -> records.put(k, get(k)));
+    return records;
+  }
+
+  // optionally implement readObject() to initialize transient states
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
new file mode 100644
index 0000000..3fb8fda
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.InitableFunction;
+import org.apache.samza.storage.kv.Entry;
+
+
+/**
+ * A function object to be used with a {@link RemoteReadWriteTable} implementation. It encapsulates the functionality
+ * of writing table record(s) for a provided set of key(s) to the store.
+ *
+ * <p> Instances of {@link TableWriteFunction} are meant to be serializable. ie. any non-serializable state
+ * (eg: network sockets) should be marked as transient and recreated inside readObject().
+ *
+ * <p> Implementations are expected to be thread-safe.
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+@InterfaceStability.Unstable
+public interface TableWriteFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+  /**
+   * Store single table {@code record} with specified {@code key}. This method must be thread-safe.
+   * @param key key for the table record
+   * @param record table record to be written
+   */
+  void put(K key, V record);
+
+  /**
+   * Store the table {@code records} with specified {@code keys}. This method must be thread-safe.
+   * @param records table records to be written
+   */
+  default void putAll(List<Entry<K, V>> records) {
+    records.forEach(e -> put(e.getKey(), e.getValue()));
+  }
+
+  /**
+   * Delete the {@code record} with specified {@code key} from the remote store
+   * @param key key to the table record to be deleted
+   */
+  void delete(K key);
+
+  /**
+   * Delete all {@code records} with the specified {@code keys} from the remote store
+   * @param keys keys for the table records to be written
+   */
+  default void deleteAll(Collection<K> keys) {
+    keys.stream().forEach(k -> delete(k));
+  }
+
+  /**
+   * Flush the remote store (optional)
+   */
+  default void flush() {
+  }
+
+  // optionally implement readObject() to initialize transient states
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/util/EmbeddedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/EmbeddedRateLimiter.java b/samza-core/src/main/java/org/apache/samza/util/EmbeddedRateLimiter.java
deleted file mode 100644
index 9ccf2f4..0000000
--- a/samza-core/src/main/java/org/apache/samza/util/EmbeddedRateLimiter.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * An embedded rate limiter
- */
-public class EmbeddedRateLimiter implements RateLimiter {
-
-  static final private Logger LOGGER = LoggerFactory.getLogger(EmbeddedRateLimiter.class);
-
-  private final int targetRate;
-  private com.google.common.util.concurrent.RateLimiter rateLimiter;
-
-  public EmbeddedRateLimiter(int creditsPerSecond) {
-    this.targetRate = creditsPerSecond;
-  }
-
-  @Override
-  public void acquire(int numberOfCredits) {
-    ensureInitialized();
-    rateLimiter.acquire(numberOfCredits);
-  }
-
-  @Override
-  public int acquire(int numberOfCredits, long timeout, TimeUnit unit) {
-    ensureInitialized();
-    return rateLimiter.tryAcquire(numberOfCredits, timeout, unit)
-        ? numberOfCredits
-        : 0;
-  }
-
-  @Override
-  public int tryAcquire(int numberOfCredits) {
-    ensureInitialized();
-    return rateLimiter.tryAcquire(numberOfCredits)
-        ? numberOfCredits
-        : 0;
-  }
-
-  @Override
-  public void acquire(Map<String, Integer> tagToCreditsMap) {
-    throw new IllegalArgumentException("This method is not applicable");
-  }
-
-  @Override
-  public Map<String, Integer> acquire(Map<String, Integer> tagToCreditsMap, long timeout, TimeUnit unit) {
-    throw new IllegalArgumentException("This method is not applicable");
-  }
-
-  @Override
-  public Map<String, Integer> tryAcquire(Map<String, Integer> tagToCreditsMap) {
-    throw new IllegalArgumentException("This method is not applicable");
-  }
-
-  @Override
-  public void init(Config config, TaskContext taskContext) {
-    int effectiveRate = targetRate;
-    if (taskContext != null) {
-      effectiveRate /= taskContext.getSamzaContainerContext().taskNames.size();
-      LOGGER.info(String.format("Effective rate limit for task %s is %d",
-          taskContext.getTaskName(), effectiveRate));
-    }
-    this.rateLimiter = com.google.common.util.concurrent.RateLimiter.create(effectiveRate);
-  }
-
-  private void ensureInitialized() {
-    Preconditions.checkState(rateLimiter != null, "Not initialized");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
index 9c20eee..1cf9a9c 100644
--- a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
+++ b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
@@ -20,6 +20,7 @@ package org.apache.samza.util;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -36,17 +37,25 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 
 /**
- * An embedded rate limiter that supports tags
+ * An embedded rate limiter that supports tags. A default tag will be used if users specifies a simple rate only
+ * for simple use cases.
  */
 public class EmbeddedTaggedRateLimiter implements RateLimiter {
-
   static final private Logger LOGGER = LoggerFactory.getLogger(EmbeddedTaggedRateLimiter.class);
+  private static final String DEFAULT_TAG = "default-tag";
+  private static final Map<String, Integer> DEFAULT_TAG_MAP = Collections.singletonMap(DEFAULT_TAG, 0);
 
   private final Map<String, Integer> tagToTargetRateMap;
   private Map<String, com.google.common.util.concurrent.RateLimiter> tagToRateLimiterMap;
+  private boolean initialized;
+
+  public EmbeddedTaggedRateLimiter(int creditsPerSecond) {
+    this(Collections.singletonMap(DEFAULT_TAG, creditsPerSecond));
+  }
 
   public EmbeddedTaggedRateLimiter(Map<String, Integer> tagToCreditsPerSecondMap) {
     Preconditions.checkArgument(tagToCreditsPerSecondMap.size() > 0, "Map of tags can't be empty");
+    tagToCreditsPerSecondMap.values().forEach(c -> Preconditions.checkArgument(c >= 0, "Credits must be non-negative"));
     this.tagToTargetRateMap = tagToCreditsPerSecondMap;
   }
 
@@ -72,39 +81,28 @@ public class EmbeddedTaggedRateLimiter implements RateLimiter {
             int availableCredits = rateLimiter.tryAcquire(requiredCredits, remainingTimeoutInNanos, NANOSECONDS)
                 ? requiredCredits
                 : 0;
-            return new ImmutablePair<String, Integer>(tag, availableCredits);
+            return new ImmutablePair<>(tag, availableCredits);
           })
         .collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue));
   }
 
   @Override
-  public Map<String, Integer> tryAcquire(Map<String, Integer> tagToCreditsMap) {
-    ensureTagsAreValid(tagToCreditsMap);
-    return tagToCreditsMap.entrySet().stream()
-        .map(e -> {
-            String tag = e.getKey();
-            int requiredCredits = e.getValue();
-            int availableCredits = tagToRateLimiterMap.get(tag).tryAcquire(requiredCredits)
-                ? requiredCredits
-                : 0;
-            return new ImmutablePair<String, Integer>(tag, availableCredits);
-          })
-        .collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue));
+  public Set<String> getSupportedTags() {
+    return Collections.unmodifiableSet(tagToRateLimiterMap.keySet());
   }
 
   @Override
   public void acquire(int numberOfCredits) {
-    throw new IllegalArgumentException("This method is not applicable");
+    ensureTagsAreValid(DEFAULT_TAG_MAP);
+    tagToRateLimiterMap.get(DEFAULT_TAG).acquire(numberOfCredits);
   }
 
   @Override
   public int acquire(int numberOfCredit, long timeout, TimeUnit unit) {
-    throw new IllegalArgumentException("This method is not applicable");
-  }
-
-  @Override
-  public int tryAcquire(int numberOfCredit) {
-    throw new IllegalArgumentException("This method is not applicable");
+    ensureTagsAreValid(DEFAULT_TAG_MAP);
+    return tagToRateLimiterMap.get(DEFAULT_TAG).tryAcquire(numberOfCredit, timeout, unit)
+        ? numberOfCredit
+        : 0;
   }
 
   @Override
@@ -118,15 +116,15 @@ public class EmbeddedTaggedRateLimiter implements RateLimiter {
               LOGGER.info(String.format("Effective rate limit for task %s and tag %s is %d",
                   taskContext.getTaskName(), tag, effectiveRate));
             }
-            return new ImmutablePair<String, com.google.common.util.concurrent.RateLimiter>(
-                tag, com.google.common.util.concurrent.RateLimiter.create(effectiveRate));
+            return new ImmutablePair<>(tag, com.google.common.util.concurrent.RateLimiter.create(effectiveRate));
           })
         .collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue))
     );
+    initialized = true;
   }
 
   private void ensureInitialized() {
-    Preconditions.checkState(tagToRateLimiterMap != null, "Not initialized");
+    Preconditions.checkState(initialized, "Not initialized");
   }
 
   private void ensureTagsAreValid(Map<String, ?> tagMap) {

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 789d75b..7cc1924 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -538,9 +538,8 @@ object SamzaContainer extends Logging {
         new SystemClock)
 
       val tableManager = new TableManager(config, serdes.asJava)
-      tableManager.initLocalTables(taskStores.asJava)
 
-      info("Got table manager");
+      info("Got table manager")
 
       val systemStreamPartitions = taskModel
         .getSystemStreamPartitions

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index cb73c5d..3ac37c6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -100,7 +100,7 @@ class TaskInstance(
     if (tableManager != null) {
       debug("Starting table manager for taskName: %s" format taskName)
 
-      tableManager.start
+      tableManager.init(containerContext, context)
     } else {
       debug("Skipping table manager initialization for taskName: %s" format taskName)
     }
@@ -244,7 +244,7 @@ class TaskInstance(
     if (tableManager != null) {
       debug("Shutting down table manager for taskName: %s" format taskName)
 
-      tableManager.shutdown
+      tableManager.close
     } else {
       debug("Skipping table manager shutdown for taskName: %s" format taskName)
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index df5b9e5..24178d0 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -27,11 +27,13 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.task.TaskContext;
 import org.junit.Test;
 
 import junit.framework.Assert;
@@ -49,13 +51,13 @@ public class TestTableManager {
 
   public static class DummyTableProviderFactory implements TableProviderFactory {
 
-    static Table table;
-    static LocalStoreBackedTableProvider tableProvider;
+    static ReadableTable table;
+    static TableProvider tableProvider;
 
     @Override
     public TableProvider getTableProvider(TableSpec tableSpec) {
-      table = mock(Table.class);
-      tableProvider = mock(LocalStoreBackedTableProvider.class);
+      table = mock(ReadableTable.class);
+      tableProvider = mock(TableProvider.class);
       when(tableProvider.getTable()).thenReturn(table);
       return tableProvider;
     }
@@ -120,10 +122,10 @@ public class TestTableManager {
           });
 
     TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
-    tableManager.initLocalTables(storageEngines);
+    tableManager.init(mock(SamzaContainerContext.class), mock(TaskContext.class));
 
     Table table = tableManager.getTable(TABLE_ID);
-    verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject());
+    verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject());
     Assert.assertEquals(DummyTableProviderFactory.table, table);
 
     Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tables");

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
new file mode 100644
index 0000000..acf3d61
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.KV;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.EmbeddedTaggedRateLimiter;
+import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
+import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestRemoteTableDescriptor {
+  private void doTestSerialize(RateLimiter rateLimiter,
+      CreditFunction readCredFn,
+      CreditFunction writeCredFn) {
+    RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
+    desc.withReadFunction(mock(TableReadFunction.class));
+    desc.withWriteFunction(mock(TableWriteFunction.class));
+    if (rateLimiter != null) {
+      desc.withRateLimiter(rateLimiter, readCredFn, writeCredFn);
+    } else {
+      desc.withReadRateLimit(100);
+      desc.withWriteRateLimit(200);
+    }
+    TableSpec spec = desc.getTableSpec();
+    Assert.assertTrue(spec.getConfig().containsKey(RemoteTableProvider.RATE_LIMITER));
+    Assert.assertEquals(readCredFn != null, spec.getConfig().containsKey(RemoteTableProvider.READ_CREDIT_FN));
+    Assert.assertEquals(writeCredFn != null, spec.getConfig().containsKey(RemoteTableProvider.WRITE_CREDIT_FN));
+  }
+
+  @Test
+  public void testSerializeSimple() {
+    doTestSerialize(null, null, null);
+  }
+
+  @Test
+  public void testSerializeWithLimiter() {
+    doTestSerialize(mock(RateLimiter.class), null, null);
+  }
+
+  @Test
+  public void testSerializeWithLimiterAndReadCredFn() {
+    doTestSerialize(mock(RateLimiter.class), kv -> 1, null);
+  }
+
+  @Test
+  public void testSerializeWithLimiterAndWriteCredFn() {
+    doTestSerialize(mock(RateLimiter.class), null, kv -> 1);
+  }
+
+  @Test
+  public void testSerializeWithLimiterAndReadWriteCredFns() {
+    doTestSerialize(mock(RateLimiter.class), kv -> 1, kv -> 1);
+  }
+
+  @Test
+  public void testSerializeNullWriteFunction() {
+    RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
+    desc.withReadFunction(mock(TableReadFunction.class));
+    TableSpec spec = desc.getTableSpec();
+    Assert.assertTrue(spec.getConfig().containsKey(RemoteTableProvider.READ_FN));
+    Assert.assertFalse(spec.getConfig().containsKey(RemoteTableProvider.WRITE_FN));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testSerializeNullReadFunction() {
+    RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
+    TableSpec spec = desc.getTableSpec();
+    Assert.assertTrue(spec.getConfig().containsKey(RemoteTableProvider.READ_FN));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testSpecifyBothRateAndRateLimiter() {
+    RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
+    desc.withReadFunction(mock(TableReadFunction.class));
+    desc.withReadRateLimit(100);
+    desc.withRateLimiter(mock(RateLimiter.class), null, null);
+    desc.getTableSpec();
+  }
+
+  private TaskContext createMockTaskContext() {
+    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+    doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
+    doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
+    TaskContext taskContext = mock(TaskContext.class);
+    doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
+    SamzaContainerContext containerCtx = new SamzaContainerContext(
+        "1", null, Collections.singleton(new TaskName("MyTask")), null);
+    doReturn(containerCtx).when(taskContext).getSamzaContainerContext();
+    return taskContext;
+  }
+
+  static class CountingCreditFunction<K, V> implements CreditFunction<K, V> {
+    int numCalls = 0;
+    @Override
+    public Integer apply(KV<K, V> kv) {
+      numCalls++;
+      return 1;
+    }
+  }
+
+  private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean rlGets, boolean rlPuts) {
+    int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0);
+    RemoteTableDescriptor<String, String> desc = new RemoteTableDescriptor("1");
+    desc.withReadFunction(mock(TableReadFunction.class));
+    desc.withWriteFunction(mock(TableWriteFunction.class));
+    if (rateOnly) {
+      if (rlGets) {
+        desc.withReadRateLimit(1000);
+      }
+      if (rlPuts) {
+        desc.withWriteRateLimit(2000);
+      }
+    } else {
+      if (numRateLimitOps > 0) {
+        Map<String, Integer> tagCredits = new HashMap<>();
+        if (rlGets) {
+          tagCredits.put(RL_READ_TAG, 1000);
+        }
+        if (rlPuts) {
+          tagCredits.put(RL_WRITE_TAG, 2000);
+        }
+
+        // Spy the rate limiter to verify call count
+        RateLimiter rateLimiter = spy(new EmbeddedTaggedRateLimiter(tagCredits));
+        desc.withRateLimiter(rateLimiter, new CountingCreditFunction(), new CountingCreditFunction());
+      }
+    }
+
+    TableSpec spec = desc.getTableSpec();
+    RemoteTableProvider provider = new RemoteTableProvider(spec);
+    provider.init(mock(SamzaContainerContext.class), createMockTaskContext());
+    Table table = provider.getTable();
+    Assert.assertTrue(table instanceof RemoteReadWriteTable);
+    RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;
+    Assert.assertNotNull(rwTable.readFn);
+    Assert.assertNotNull(rwTable.writeFn);
+    if (numRateLimitOps > 0) {
+      Assert.assertNotNull(rwTable.rateLimiter);
+    }
+
+    // Verify rate limiter usage
+    if (numRateLimitOps > 0) {
+      rwTable.get("xxx");
+      rwTable.put("yyy", "zzz");
+
+      if (!rateOnly) {
+        verify(rwTable.rateLimiter, times(numRateLimitOps)).acquire(anyMap());
+
+        CountingCreditFunction<?, ?> readCreditFn = (CountingCreditFunction<?, ?>) rwTable.readCreditFn;
+        CountingCreditFunction<?, ?> writeCreditFn = (CountingCreditFunction<?, ?>) rwTable.writeCreditFn;
+
+        Assert.assertNotNull(readCreditFn);
+        Assert.assertNotNull(writeCreditFn);
+
+        Assert.assertEquals(readCreditFn.numCalls, rlGets ? 1 : 0);
+        Assert.assertEquals(writeCreditFn.numCalls, rlPuts ? 1 : 0);
+      } else {
+        Assert.assertTrue(rwTable.rateLimiter instanceof EmbeddedTaggedRateLimiter);
+        Assert.assertEquals(rwTable.rateLimiter.getSupportedTags().size(), numRateLimitOps);
+        if (rlGets) {
+          Assert.assertTrue(rwTable.rateLimiter.getSupportedTags().contains(RL_READ_TAG));
+        }
+        if (rlPuts) {
+          Assert.assertTrue(rwTable.rateLimiter.getSupportedTags().contains(RL_WRITE_TAG));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testDeserializeReadFunctionNoRateLimit() {
+    doTestDeserializeReadFunctionAndLimiter(false, false, false);
+  }
+
+  @Test
+  public void testDeserializeReadFunctionAndLimiterWrite() {
+    doTestDeserializeReadFunctionAndLimiter(false, false, true);
+  }
+
+  @Test
+  public void testDeserializeReadFunctionAndLimiterRead() {
+    doTestDeserializeReadFunctionAndLimiter(false, true, false);
+  }
+
+  @Test
+  public void testDeserializeReadFunctionAndLimiterReadWrite() {
+    doTestDeserializeReadFunctionAndLimiter(false, true, true);
+  }
+
+  @Test
+  public void testDeserializeReadFunctionAndLimiterRateOnlyWrite() {
+    doTestDeserializeReadFunctionAndLimiter(true, false, true);
+  }
+
+  @Test
+  public void testDeserializeReadFunctionAndLimiterRateOnlyRead() {
+    doTestDeserializeReadFunctionAndLimiter(true, true, false);
+  }
+
+  @Test
+  public void testDeserializeReadFunctionAndLimiterRateOnlyReadWrite() {
+    doTestDeserializeReadFunctionAndLimiter(true, true, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedRateLimiter.java b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedRateLimiter.java
deleted file mode 100644
index 1b3f687..0000000
--- a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedRateLimiter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestEmbeddedRateLimiter {
-
-  final static private int TEST_INTERVAL = 200; // ms
-  final static private int TARGET_RATE = 4000;
-  final static private int NUMBER_OF_TASKS = 2;
-  final static private int TARGET_RATE_PER_TASK = TARGET_RATE / NUMBER_OF_TASKS;
-  final static private int INCREMENT = 2;
-
-  @Test
-  public void testAcquire() {
-    RateLimiter rateLimiter = new EmbeddedRateLimiter(TARGET_RATE);
-    initRateLimiter(rateLimiter);
-
-    int count = 0;
-    long start = System.currentTimeMillis();
-    while (System.currentTimeMillis() - start < TEST_INTERVAL) {
-      rateLimiter.acquire(INCREMENT);
-      count += INCREMENT;
-    }
-
-    long rate = count * 1000 / TEST_INTERVAL;
-    verifyRate(rate);
-  }
-
-  @Test
-  public void testTryAcquire() {
-    RateLimiter rateLimiter = new EmbeddedRateLimiter(TARGET_RATE);
-    initRateLimiter(rateLimiter);
-
-    boolean hasSeenZeros = false;
-
-    int count = 0;
-    long start = System.currentTimeMillis();
-    while (System.currentTimeMillis() - start < TEST_INTERVAL) {
-      int availableCredits = rateLimiter.tryAcquire(INCREMENT);
-      if (availableCredits <= 0) {
-        hasSeenZeros = true;
-      } else {
-        count += INCREMENT;
-      }
-    }
-
-    long rate = count * 1000 / TEST_INTERVAL;
-    verifyRate(rate);
-    Assert.assertTrue(hasSeenZeros);
-  }
-
-  @Test
-  public void testAcquireWithTimeout() {
-    RateLimiter rateLimiter = new EmbeddedRateLimiter(TARGET_RATE);
-    initRateLimiter(rateLimiter);
-
-    boolean hasSeenZeros = false;
-
-    int count = 0;
-    int callCount = 0;
-    long start = System.currentTimeMillis();
-    while (System.currentTimeMillis() - start < TEST_INTERVAL) {
-      ++callCount;
-      int availableCredits = rateLimiter.acquire(INCREMENT, 20, MILLISECONDS);
-      if (availableCredits <= 0) {
-        hasSeenZeros = true;
-      } else {
-        count += INCREMENT;
-      }
-    }
-
-    long rate = count * 1000 / TEST_INTERVAL;
-    verifyRate(rate);
-    Assert.assertTrue(Math.abs(callCount - TARGET_RATE_PER_TASK * TEST_INTERVAL / 1000 / INCREMENT) <= 2);
-    Assert.assertFalse(hasSeenZeros);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testFailsWhenUninitialized() {
-    new EmbeddedRateLimiter(100).acquire(1);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testFailsWhenUsingTags() {
-    RateLimiter rateLimiter = new EmbeddedRateLimiter(10);
-    initRateLimiter(rateLimiter);
-    Map<String, Integer> tagToCredits = new HashMap<>();
-    tagToCredits.put("red", 1);
-    tagToCredits.put("green", 1);
-    rateLimiter.acquire(tagToCredits);
-  }
-
-  private void verifyRate(long rate) {
-    // As the actual rate would likely not be exactly the same as target rate, the calculation below
-    // verifies the actual rate is within 5% of the target rate per task
-    Assert.assertTrue(Math.abs(rate - TARGET_RATE_PER_TASK) <= TARGET_RATE_PER_TASK * 5 / 100);
-  }
-
-  static void initRateLimiter(RateLimiter rateLimiter) {
-    Config config = mock(Config.class);
-    TaskContext taskContext = mock(TaskContext.class);
-    SamzaContainerContext containerContext = mockSamzaContainerContext();
-    when(taskContext.getSamzaContainerContext()).thenReturn(containerContext);
-    rateLimiter.init(config, taskContext);
-  }
-
-  static SamzaContainerContext mockSamzaContainerContext() {
-    try {
-      Collection<String> taskNames = mock(Collection.class);
-      when(taskNames.size()).thenReturn(NUMBER_OF_TASKS);
-      SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
-      Field taskNamesField = SamzaContainerContext.class.getDeclaredField("taskNames");
-      taskNamesField.setAccessible(true);
-      taskNamesField.set(containerContext, taskNames);
-      taskNamesField.setAccessible(false);
-      return containerContext;
-    } catch (Exception ex) {
-      throw new SamzaException(ex);
-    }
-  }
-}