You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:47 UTC

[07/12] samza git commit: Consolidating package names for System, Stream, Application and Table descriptors.

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index a5eeba1..85306ef 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -32,10 +32,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.application.SamzaApplication;
-import org.apache.samza.application.ApplicationDescriptorUtil;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 94ff1eb..c5c0d78 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -24,9 +24,9 @@ import java.util.Map;
 import java.util.Random;
 import org.slf4j.MDC;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
-import org.apache.samza.application.ApplicationDescriptorUtil;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 69eb5fe..1b38c9b 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -22,10 +22,10 @@ package org.apache.samza.runtime;
 import java.time.Duration;
 import java.util.List;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.application.SamzaApplication;
-import org.apache.samza.application.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.execution.RemoteJobPlanner;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java b/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java
new file mode 100644
index 0000000..aa0f6a4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for samza framework internal usage.
+ * <p>
+ * Allows creating a {@link SystemDescriptor} without setting the factory class name, and delegating
+ * rest of the system customization to configurations.
+ * <p>
+ * Useful for code-generation and testing use cases where the factory name is not known in advance.
+ */
+@SuppressWarnings("unchecked")
+public final class DelegatingSystemDescriptor extends SystemDescriptor<DelegatingSystemDescriptor>
+    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+
+  /**
+   * Constructs an {@link DelegatingSystemDescriptor} instance with no system level serde.
+   * Serdes must be provided explicitly at stream level when getting input or output descriptors.
+   * SystemFactory class name must be provided in configuration.
+   *
+   * @param systemName name of this system
+   */
+  @VisibleForTesting
+  public DelegatingSystemDescriptor(String systemName) {
+    super(systemName, null, null, null);
+  }
+
+  @Override
+  public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new GenericInputDescriptor<>(streamId, this, serde);
+  }
+
+  @Override
+  public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new GenericOutputDescriptor<>(streamId, this, serde);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
index 03be758..05454a6 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
@@ -27,8 +27,10 @@ import java.util.Map;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.descriptors.TableProviderFactory;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.operators.TableImpl;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index d7b15a4..1d1e8b1 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -23,6 +23,8 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.context.Context;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.descriptors.TableProviderFactory;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.util.Util;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
deleted file mode 100644
index f9d4007..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.hybrid.BaseHybridTableDescriptor;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * Table descriptor for {@link CachingTable}.
- * @param <K> type of the key in the cache
- * @param <V> type of the value in the cache
- */
-public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V, CachingTableDescriptor<K, V>> {
-  private Duration readTtl;
-  private Duration writeTtl;
-  private long cacheSize;
-  private TableDescriptor<K, V, ?> cache;
-  private TableDescriptor<K, V, ?> table;
-  private boolean isWriteAround;
-
-  /**
-   * Constructs a table descriptor instance with internal cache
-   *
-   * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
-   * @param table target table descriptor
-   */
-  public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table) {
-    super(tableId);
-    this.table = table;
-  }
-
-  /**
-   * Constructs a table descriptor instance and specify a cache (as Table descriptor)
-   * to be used for caching. Cache get is not synchronized with put for better parallelism
-   * in the read path of {@link CachingTable}. As such, cache table implementation is
-   * expected to be thread-safe for concurrent accesses.
-   *
-   * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
-   * @param table target table descriptor
-   * @param cache cache table descriptor
-   */
-  public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table,
-      TableDescriptor<K, V, ?> cache) {
-    this(tableId, table);
-    this.cache = cache;
-  }
-
-  @Override
-  public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors() {
-    return cache != null
-        ? Arrays.asList(cache, table)
-        : Arrays.asList(table);
-  }
-
-  @Override
-  public TableSpec getTableSpec() {
-    validate();
-
-    Map<String, String> tableSpecConfig = new HashMap<>();
-    generateTableSpecConfig(tableSpecConfig);
-
-    if (cache != null) {
-      tableSpecConfig.put(CachingTableProvider.CACHE_TABLE_ID, ((BaseTableDescriptor) cache).getTableSpec().getId());
-    } else {
-      if (readTtl != null) {
-        tableSpecConfig.put(CachingTableProvider.READ_TTL_MS, String.valueOf(readTtl.toMillis()));
-      }
-      if (writeTtl != null) {
-        tableSpecConfig.put(CachingTableProvider.WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()));
-      }
-      if (cacheSize > 0) {
-        tableSpecConfig.put(CachingTableProvider.CACHE_SIZE, String.valueOf(cacheSize));
-      }
-    }
-
-    tableSpecConfig.put(CachingTableProvider.REAL_TABLE_ID, ((BaseTableDescriptor) table).getTableSpec().getId());
-    tableSpecConfig.put(CachingTableProvider.WRITE_AROUND, String.valueOf(isWriteAround));
-
-    return new TableSpec(tableId, serde, CachingTableProviderFactory.class.getName(), tableSpecConfig);
-  }
-
-  /**
-   * Specify the TTL for each read access, ie. record is expired after
-   * the TTL duration since last read access of each key.
-   * @param readTtl read TTL
-   * @return this descriptor
-   */
-  public CachingTableDescriptor withReadTtl(Duration readTtl) {
-    this.readTtl = readTtl;
-    return this;
-  }
-
-  /**
-   * Specify the TTL for each write access, ie. record is expired after
-   * the TTL duration since last write access of each key.
-   * @param writeTtl write TTL
-   * @return this descriptor
-   */
-  public CachingTableDescriptor withWriteTtl(Duration writeTtl) {
-    this.writeTtl = writeTtl;
-    return this;
-  }
-
-  /**
-   * Specify the max cache size for size-based eviction.
-   * @param cacheSize max size of the cache
-   * @return this descriptor
-   */
-  public CachingTableDescriptor withCacheSize(long cacheSize) {
-    this.cacheSize = cacheSize;
-    return this;
-  }
-
-  /**
-   * Specify if write-around policy should be used to bypass writing
-   * to cache for put operations. This is useful when put() is the
-   * dominant operation and get() has no locality with recent puts.
-   * @return this descriptor
-   */
-  public CachingTableDescriptor withWriteAround() {
-    this.isWriteAround = true;
-    return this;
-  }
-
-  @Override
-  protected void validate() {
-    super.validate();
-    Preconditions.checkNotNull(table, "Actual table is required.");
-    if (cache == null) {
-      Preconditions.checkNotNull(readTtl, "readTtl must be specified.");
-    } else {
-      Preconditions.checkArgument(readTtl == null && writeTtl == null && cacheSize == 0,
-          "Invalid to specify both {cache} and {readTtl|writeTtl|cacheSize} at the same time.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
deleted file mode 100644
index c959a56..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.caching.guava.GuavaCacheTable;
-import org.apache.samza.table.utils.BaseTableProvider;
-
-import com.google.common.cache.CacheBuilder;
-
-/**
- * Table provider for {@link CachingTable}.
- */
-public class CachingTableProvider extends BaseTableProvider {
-
-  public static final String REAL_TABLE_ID = "realTableId";
-  public static final String CACHE_TABLE_ID = "cacheTableId";
-  public static final String READ_TTL_MS = "readTtl";
-  public static final String WRITE_TTL_MS = "writeTtl";
-  public static final String CACHE_SIZE = "cacheSize";
-  public static final String WRITE_AROUND = "writeAround";
-
-  // Store the cache instances created by default
-  private final List<ReadWriteTable> defaultCaches = new ArrayList<>();
-
-  public CachingTableProvider(TableSpec tableSpec) {
-    super(tableSpec);
-  }
-
-  @Override
-  public Table getTable() {
-    String realTableId = tableSpec.getConfig().get(REAL_TABLE_ID);
-    ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
-
-    String cacheTableId = tableSpec.getConfig().get(CACHE_TABLE_ID);
-    ReadWriteTable cache;
-
-    if (cacheTableId != null) {
-      cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId);
-    } else {
-      cache = createDefaultCacheTable(realTableId);
-      defaultCaches.add(cache);
-    }
-
-    boolean isWriteAround = Boolean.parseBoolean(tableSpec.getConfig().get(WRITE_AROUND));
-    CachingTable cachingTable = new CachingTable(tableSpec.getId(), table, cache, isWriteAround);
-    cachingTable.init(this.context);
-    return cachingTable;
-  }
-
-  @Override
-  public void close() {
-    defaultCaches.forEach(c -> c.close());
-  }
-
-  private ReadWriteTable createDefaultCacheTable(String tableId) {
-    long readTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(READ_TTL_MS, "-1"));
-    long writeTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(WRITE_TTL_MS, "-1"));
-    long cacheSize = Long.parseLong(tableSpec.getConfig().getOrDefault(CACHE_SIZE, "-1"));
-
-    CacheBuilder cacheBuilder = CacheBuilder.newBuilder();
-    if (readTtlMs != -1) {
-      cacheBuilder.expireAfterAccess(readTtlMs, TimeUnit.MILLISECONDS);
-    }
-    if (writeTtlMs != -1) {
-      cacheBuilder.expireAfterWrite(writeTtlMs, TimeUnit.MILLISECONDS);
-    }
-    if (cacheSize != -1) {
-      cacheBuilder.maximumSize(cacheSize);
-    }
-
-    logger.info(String.format("Creating default cache with: readTtl=%d, writeTtl=%d, maxSize=%d",
-        readTtlMs, writeTtlMs, cacheSize));
-
-    GuavaCacheTable cacheTable = new GuavaCacheTable(tableId + "-def-cache", cacheBuilder.build());
-    cacheTable.init(this.context);
-
-    return cacheTable;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
deleted file mode 100644
index 9262207..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching;
-
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
-
-/**
- * Table provider factory for {@link CachingTable}.
- */
-public class CachingTableProviderFactory implements TableProviderFactory {
-  @Override
-  public TableProvider getTableProvider(TableSpec tableSpec) {
-    return new CachingTableProvider(tableSpec);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableDescriptor.java
new file mode 100644
index 0000000..10665a3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableDescriptor.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.caching.descriptors;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.BaseHybridTableDescriptor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Table descriptor for {@link org.apache.samza.table.caching.CachingTable}.
+ * @param <K> type of the key in the cache
+ * @param <V> type of the value in the cache
+ */
+public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V, CachingTableDescriptor<K, V>> {
+  private Duration readTtl;
+  private Duration writeTtl;
+  private long cacheSize;
+  private TableDescriptor<K, V, ?> cache;
+  private TableDescriptor<K, V, ?> table;
+  private boolean isWriteAround;
+
+  /**
+   * Constructs a table descriptor instance with internal cache
+   *
+   * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
+   * @param table target table descriptor
+   */
+  public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table) {
+    super(tableId);
+    this.table = table;
+  }
+
+  /**
+   * Constructs a table descriptor instance and specify a cache (as Table descriptor)
+   * to be used for caching. Cache get is not synchronized with put for better parallelism
+   * in the read path of {@link org.apache.samza.table.caching.CachingTable}. As such, cache table implementation is
+   * expected to be thread-safe for concurrent accesses.
+   *
+   * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
+   * @param table target table descriptor
+   * @param cache cache table descriptor
+   */
+  public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table,
+      TableDescriptor<K, V, ?> cache) {
+    this(tableId, table);
+    this.cache = cache;
+  }
+
+  @Override
+  public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors() {
+    return cache != null
+        ? Arrays.asList(cache, table)
+        : Arrays.asList(table);
+  }
+
+  @Override
+  public TableSpec getTableSpec() {
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    if (cache != null) {
+      tableSpecConfig.put(CachingTableProvider.CACHE_TABLE_ID, ((BaseTableDescriptor) cache).getTableSpec().getId());
+    } else {
+      if (readTtl != null) {
+        tableSpecConfig.put(CachingTableProvider.READ_TTL_MS, String.valueOf(readTtl.toMillis()));
+      }
+      if (writeTtl != null) {
+        tableSpecConfig.put(CachingTableProvider.WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()));
+      }
+      if (cacheSize > 0) {
+        tableSpecConfig.put(CachingTableProvider.CACHE_SIZE, String.valueOf(cacheSize));
+      }
+    }
+
+    tableSpecConfig.put(CachingTableProvider.REAL_TABLE_ID, ((BaseTableDescriptor) table).getTableSpec().getId());
+    tableSpecConfig.put(CachingTableProvider.WRITE_AROUND, String.valueOf(isWriteAround));
+
+    return new TableSpec(tableId, serde, CachingTableProviderFactory.class.getName(), tableSpecConfig);
+  }
+
+  /**
+   * Specify the TTL for each read access, ie. record is expired after
+   * the TTL duration since last read access of each key.
+   * @param readTtl read TTL
+   * @return this descriptor
+   */
+  public CachingTableDescriptor withReadTtl(Duration readTtl) {
+    this.readTtl = readTtl;
+    return this;
+  }
+
+  /**
+   * Specify the TTL for each write access, ie. record is expired after
+   * the TTL duration since last write access of each key.
+   * @param writeTtl write TTL
+   * @return this descriptor
+   */
+  public CachingTableDescriptor withWriteTtl(Duration writeTtl) {
+    this.writeTtl = writeTtl;
+    return this;
+  }
+
+  /**
+   * Specify the max cache size for size-based eviction.
+   * @param cacheSize max size of the cache
+   * @return this descriptor
+   */
+  public CachingTableDescriptor withCacheSize(long cacheSize) {
+    this.cacheSize = cacheSize;
+    return this;
+  }
+
+  /**
+   * Specify if write-around policy should be used to bypass writing
+   * to cache for put operations. This is useful when put() is the
+   * dominant operation and get() has no locality with recent puts.
+   * @return this descriptor
+   */
+  public CachingTableDescriptor withWriteAround() {
+    this.isWriteAround = true;
+    return this;
+  }
+
+  @Override
+  @VisibleForTesting
+  public void validate() {
+    super.validate();
+    Preconditions.checkNotNull(table, "Actual table is required.");
+    if (cache == null) {
+      Preconditions.checkNotNull(readTtl, "readTtl must be specified.");
+    } else {
+      Preconditions.checkArgument(readTtl == null && writeTtl == null && cacheSize == 0,
+          "Invalid to specify both {cache} and {readTtl|writeTtl|cacheSize} at the same time.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProvider.java
new file mode 100644
index 0000000..007f372
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProvider.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.caching.descriptors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.caching.CachingTable;
+import org.apache.samza.table.caching.guava.GuavaCacheTable;
+import org.apache.samza.table.utils.descriptors.BaseTableProvider;
+
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Table provider for {@link CachingTable}.
+ */
+public class CachingTableProvider extends BaseTableProvider {
+
+  public static final String REAL_TABLE_ID = "realTableId";
+  public static final String CACHE_TABLE_ID = "cacheTableId";
+  public static final String READ_TTL_MS = "readTtl";
+  public static final String WRITE_TTL_MS = "writeTtl";
+  public static final String CACHE_SIZE = "cacheSize";
+  public static final String WRITE_AROUND = "writeAround";
+
+  // Store the cache instances created by default
+  private final List<ReadWriteTable> defaultCaches = new ArrayList<>();
+
+  public CachingTableProvider(TableSpec tableSpec) {
+    super(tableSpec);
+  }
+
+  @Override
+  public Table getTable() {
+    String realTableId = tableSpec.getConfig().get(REAL_TABLE_ID);
+    ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
+
+    String cacheTableId = tableSpec.getConfig().get(CACHE_TABLE_ID);
+    ReadWriteTable cache;
+
+    if (cacheTableId != null) {
+      cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId);
+    } else {
+      cache = createDefaultCacheTable(realTableId);
+      defaultCaches.add(cache);
+    }
+
+    boolean isWriteAround = Boolean.parseBoolean(tableSpec.getConfig().get(WRITE_AROUND));
+    CachingTable cachingTable = new CachingTable(tableSpec.getId(), table, cache, isWriteAround);
+    cachingTable.init(this.context);
+    return cachingTable;
+  }
+
+  @Override
+  public void close() {
+    defaultCaches.forEach(c -> c.close());
+  }
+
+  private ReadWriteTable createDefaultCacheTable(String tableId) {
+    long readTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(READ_TTL_MS, "-1"));
+    long writeTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(WRITE_TTL_MS, "-1"));
+    long cacheSize = Long.parseLong(tableSpec.getConfig().getOrDefault(CACHE_SIZE, "-1"));
+
+    CacheBuilder cacheBuilder = CacheBuilder.newBuilder();
+    if (readTtlMs != -1) {
+      cacheBuilder.expireAfterAccess(readTtlMs, TimeUnit.MILLISECONDS);
+    }
+    if (writeTtlMs != -1) {
+      cacheBuilder.expireAfterWrite(writeTtlMs, TimeUnit.MILLISECONDS);
+    }
+    if (cacheSize != -1) {
+      cacheBuilder.maximumSize(cacheSize);
+    }
+
+    logger.info(String.format("Creating default cache with: readTtl=%d, writeTtl=%d, maxSize=%d",
+        readTtlMs, writeTtlMs, cacheSize));
+
+    GuavaCacheTable cacheTable = new GuavaCacheTable(tableId + "-def-cache", cacheBuilder.build());
+    cacheTable.init(this.context);
+
+    return cacheTable;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProviderFactory.java
new file mode 100644
index 0000000..68eb162
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProviderFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.caching.descriptors;
+
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.descriptors.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+/**
+ * Table provider factory for {@link org.apache.samza.table.caching.CachingTable}.
+ */
+public class CachingTableProviderFactory implements TableProviderFactory {
+  @Override
+  public TableProvider getTableProvider(TableSpec tableSpec) {
+    return new CachingTableProvider(tableSpec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableDescriptor.java
deleted file mode 100644
index 4a05013..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableDescriptor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching.guava;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.utils.SerdeUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-
-
-/**
- * Table descriptor for {@link GuavaCacheTable}.
- * @param <K> type of the key in the cache
- * @param <V> type of the value in the cache
- */
-public class GuavaCacheTableDescriptor<K, V> extends BaseTableDescriptor<K, V, GuavaCacheTableDescriptor<K, V>> {
-  private Cache<K, V> cache;
-
-  /**
-   * {@inheritDoc}
-   */
-  public GuavaCacheTableDescriptor(String tableId) {
-    super(tableId);
-  }
-
-  @Override
-  public TableSpec getTableSpec() {
-    validate();
-
-    Map<String, String> tableSpecConfig = new HashMap<>();
-    generateTableSpecConfig(tableSpecConfig);
-
-    tableSpecConfig.put(GuavaCacheTableProvider.GUAVA_CACHE, SerdeUtils.serialize("Guava cache", cache));
-
-    return new TableSpec(tableId, serde, GuavaCacheTableProviderFactory.class.getName(), tableSpecConfig);
-  }
-
-  /**
-   * Specify a pre-configured Guava cache instance to be used for caching table.
-   * @param cache Guava cache instance
-   * @return this descriptor
-   */
-  public GuavaCacheTableDescriptor withCache(Cache<K, V> cache) {
-    this.cache = cache;
-    return this;
-  }
-
-  @Override
-  protected void validate() {
-    super.validate();
-    Preconditions.checkArgument(cache != null, "Must provide a Guava cache instance.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
deleted file mode 100644
index 39f332e..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching.guava;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.utils.BaseTableProvider;
-import org.apache.samza.table.utils.SerdeUtils;
-
-import com.google.common.cache.Cache;
-
-
-/**
- * Table provider for {@link GuavaCacheTable}.
- */
-public class GuavaCacheTableProvider extends BaseTableProvider {
-
-  public static final String GUAVA_CACHE = "guavaCache";
-
-  private List<GuavaCacheTable> guavaTables = new ArrayList<>();
-
-  public GuavaCacheTableProvider(TableSpec tableSpec) {
-    super(tableSpec);
-  }
-
-  @Override
-  public Table getTable() {
-    Cache guavaCache = SerdeUtils.deserialize(GUAVA_CACHE, tableSpec.getConfig().get(GUAVA_CACHE));
-    GuavaCacheTable table = new GuavaCacheTable(tableSpec.getId(), guavaCache);
-    table.init(this.context);
-    guavaTables.add(table);
-    return table;
-  }
-
-  @Override
-  public void close() {
-    guavaTables.forEach(t -> t.close());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
deleted file mode 100644
index 066c6f9..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching.guava;
-
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
-
-/**
- * Table provider factory for {@link GuavaCacheTable}.
- */
-public class GuavaCacheTableProviderFactory implements TableProviderFactory {
-  @Override
-  public TableProvider getTableProvider(TableSpec tableSpec) {
-    return new GuavaCacheTableProvider(tableSpec);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableDescriptor.java
new file mode 100644
index 0000000..e0224bb
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableDescriptor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.caching.guava.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.utils.SerdeUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+
+
+/**
+ * Table descriptor for {@link org.apache.samza.table.caching.guava.GuavaCacheTable}.
+ * @param <K> type of the key in the cache
+ * @param <V> type of the value in the cache
+ */
+public class GuavaCacheTableDescriptor<K, V> extends BaseTableDescriptor<K, V, GuavaCacheTableDescriptor<K, V>> {
+  private Cache<K, V> cache;
+
+  /**
+   * {@inheritDoc}
+   */
+  public GuavaCacheTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  @Override
+  public TableSpec getTableSpec() {
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    tableSpecConfig.put(GuavaCacheTableProvider.GUAVA_CACHE, SerdeUtils.serialize("Guava cache", cache));
+
+    return new TableSpec(tableId, serde, GuavaCacheTableProviderFactory.class.getName(), tableSpecConfig);
+  }
+
+  /**
+   * Specify a pre-configured Guava cache instance to be used for caching table.
+   * @param cache Guava cache instance
+   * @return this descriptor
+   */
+  public GuavaCacheTableDescriptor withCache(Cache<K, V> cache) {
+    this.cache = cache;
+    return this;
+  }
+
+  @Override
+  protected void validate() {
+    super.validate();
+    Preconditions.checkArgument(cache != null, "Must provide a Guava cache instance.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProvider.java
new file mode 100644
index 0000000..45d1fdc
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.caching.guava.descriptors;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.caching.guava.GuavaCacheTable;
+import org.apache.samza.table.utils.descriptors.BaseTableProvider;
+import org.apache.samza.table.utils.SerdeUtils;
+
+import com.google.common.cache.Cache;
+
+
+/**
+ * Table provider for {@link GuavaCacheTable}.
+ */
+public class GuavaCacheTableProvider extends BaseTableProvider {
+
+  public static final String GUAVA_CACHE = "guavaCache";
+
+  private List<GuavaCacheTable> guavaTables = new ArrayList<>();
+
+  public GuavaCacheTableProvider(TableSpec tableSpec) {
+    super(tableSpec);
+  }
+
+  @Override
+  public Table getTable() {
+    Cache guavaCache = SerdeUtils.deserialize(GUAVA_CACHE, tableSpec.getConfig().get(GUAVA_CACHE));
+    GuavaCacheTable table = new GuavaCacheTable(tableSpec.getId(), guavaCache);
+    table.init(this.context);
+    guavaTables.add(table);
+    return table;
+  }
+
+  @Override
+  public void close() {
+    guavaTables.forEach(t -> t.close());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProviderFactory.java
new file mode 100644
index 0000000..01228cc
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProviderFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.caching.guava.descriptors;
+
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.descriptors.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+/**
+ * Table provider factory for {@link org.apache.samza.table.caching.guava.GuavaCacheTable}.
+ */
+public class GuavaCacheTableProviderFactory implements TableProviderFactory {
+  @Override
+  public TableProvider getTableProvider(TableSpec tableSpec) {
+    return new GuavaCacheTableProvider(tableSpec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseHybridTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseHybridTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseHybridTableDescriptor.java
new file mode 100644
index 0000000..15486c7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseHybridTableDescriptor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import java.util.List;
+
+/**
+ * Base class for hybrid table descriptors. A hybrid table consists of one or more
+ * table descriptors, and it orchestrates operations between them to achieve more advanced
+ * functionality.
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ * @param <D> the type of this table descriptor
+ */
+abstract public class BaseHybridTableDescriptor<K, V, D extends BaseHybridTableDescriptor<K, V, D>>
+    extends BaseTableDescriptor<K, V, D> {
+
+  /**
+   * {@inheritDoc}
+   */
+  public BaseHybridTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  /**
+   * Get tables contained within this table.
+   * @return list of tables
+   */
+  abstract public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
new file mode 100644
index 0000000..246216b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Base class for all table descriptor implementations.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
+    implements TableDescriptor<K, V, D> {
+
+  protected final String tableId;
+
+  protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde());
+
+  protected final Map<String, String> config = new HashMap<>();
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   */
+  protected BaseTableDescriptor(String tableId) {
+    this.tableId = tableId;
+  }
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   * @param serde the serde for key and value
+   */
+  protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    this.tableId = tableId;
+    this.serde = serde;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public D withConfig(String key, String value) {
+    config.put(key, value);
+    return (D) this;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getTableId() {
+    return tableId;
+  }
+
+  /**
+   * Get the serde assigned to this {@link TableDescriptor}
+   *
+   * @return {@link KVSerde} used by this table
+   */
+  public KVSerde<K, V> getSerde() {
+    return serde;
+  }
+
+  /**
+   * Generate config for {@link TableSpec}; this method is used internally.
+   * @param tableSpecConfig configuration for the {@link TableSpec}
+   */
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+    tableSpecConfig.putAll(config);
+  }
+
+  /**
+   * Validate that this table descriptor is constructed properly; this method is used internally.
+   */
+  protected void validate() {
+  }
+
+  /**
+   * Create a {@link TableSpec} from this table descriptor; this method is used internally.
+   *
+   * @return the {@link TableSpec}
+   */
+  abstract public TableSpec getTableSpec();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java
deleted file mode 100644
index 48efd0c..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.hybrid;
-
-import java.util.List;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.TableDescriptor;
-
-/**
- * Base class for hybrid table descriptors. A hybrid table consists of one or more
- * table descriptors, and it orchestrates operations between them to achieve more advanced
- * functionality.
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- * @param <D> the type of this table descriptor
- */
-abstract public class BaseHybridTableDescriptor<K, V, D extends BaseHybridTableDescriptor<K, V, D>>
-    extends BaseTableDescriptor<K, V, D> {
-
-  /**
-   * {@inheritDoc}
-   */
-  public BaseHybridTableDescriptor(String tableId) {
-    super(tableId);
-  }
-
-  /**
-   * Get tables contained within this table.
-   * @return list of tables
-   */
-  abstract public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index 4cbc270..52bdc71 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -44,9 +44,8 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
 
   private DefaultTableWriteMetrics writeMetrics;
 
-  @VisibleForTesting
-  final TableWriteFunction<K, V> writeFn;
-  final TableRateLimiter writeRateLimiter;
+  protected final TableWriteFunction<K, V> writeFn;
+  protected final TableRateLimiter writeRateLimiter;
 
   public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
       TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> writeRateLimiter,
@@ -219,4 +218,14 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     writeFn.close();
     super.close();
   }
+
+  @VisibleForTesting
+  public TableWriteFunction<K, V> getWriteFn() {
+    return writeFn;
+  }
+
+  @VisibleForTesting
+  public TableRateLimiter getWriteRateLimiter() {
+    return writeRateLimiter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index 9487e39..0ae2728 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -77,13 +77,11 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
 
   protected final ExecutorService callbackExecutor;
   protected final ExecutorService tableExecutor;
+  protected final TableReadFunction<K, V> readFn;
+  protected final TableRateLimiter<K, V> readRateLimiter;
 
   private DefaultTableReadMetrics readMetrics;
 
-  @VisibleForTesting
-  final TableReadFunction<K, V> readFn;
-  final TableRateLimiter<K, V> readRateLimiter;
-
   /**
    * Construct a RemoteReadableTable instance
    * @param tableId table id
@@ -298,4 +296,24 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
   public void close() {
     readFn.close();
   }
+
+  @VisibleForTesting
+  public ExecutorService getCallbackExecutor() {
+    return callbackExecutor;
+  }
+
+  @VisibleForTesting
+  public ExecutorService getTableExecutor() {
+    return tableExecutor;
+  }
+
+  @VisibleForTesting
+  public TableReadFunction<K, V> getReadFn() {
+    return readFn;
+  }
+
+  @VisibleForTesting
+  public TableRateLimiter<K, V> getReadRateLimiter() {
+    return readRateLimiter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
deleted file mode 100644
index fe01e6b..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.retry.TableRetryPolicy;
-import org.apache.samza.table.utils.SerdeUtils;
-import org.apache.samza.util.EmbeddedTaggedRateLimiter;
-import org.apache.samza.util.RateLimiter;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * Table descriptor for remote store backed tables
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
-public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, RemoteTableDescriptor<K, V>> {
-  /**
-   * Tag to be used for provision credits for rate limiting read operations from the remote table.
-   * Caller must pre-populate the credits with this tag when specifying a custom rate limiter instance
-   * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
-   * TableRateLimiter.CreditFunction)}
-   */
-  public static final String RL_READ_TAG = "readTag";
-
-  /**
-   * Tag to be used for provision credits for rate limiting write operations into the remote table.
-   * Caller can optionally populate the credits with this tag when specifying a custom rate limiter instance
-   * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
-   * TableRateLimiter.CreditFunction)} and it needs the write functionality.
-   */
-  public static final String RL_WRITE_TAG = "writeTag";
-
-  // Input support for a specific remote store (required)
-  private TableReadFunction<K, V> readFn;
-
-  // Output support for a specific remote store (optional)
-  private TableWriteFunction<K, V> writeFn;
-
-  // Rate limiter for client-side throttling;
-  // can either be constructed indirectly from rates or overridden by withRateLimiter()
-  private RateLimiter rateLimiter;
-
-  // Rates for constructing the default rate limiter when they are non-zero
-  private Map<String, Integer> tagCreditsMap = new HashMap<>();
-
-  private TableRateLimiter.CreditFunction<K, V> readCreditFn;
-  private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
-
-  private TableRetryPolicy readRetryPolicy;
-  private TableRetryPolicy writeRetryPolicy;
-
-  // By default execute future callbacks on the native client threads
-  // ie. no additional thread pool for callbacks.
-  private int asyncCallbackPoolSize = -1;
-
-  /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   */
-  public RemoteTableDescriptor(String tableId) {
-    super(tableId);
-  }
-
-  /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   * @param serde the serde for key and value
-   */
-  public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
-    super(tableId, serde);
-  }
-
-  @Override
-  public TableSpec getTableSpec() {
-    validate();
-
-    Map<String, String> tableSpecConfig = new HashMap<>();
-    generateTableSpecConfig(tableSpecConfig);
-
-    // Serialize and store reader/writer functions
-    tableSpecConfig.put(RemoteTableProvider.READ_FN, SerdeUtils.serialize("read function", readFn));
-
-    if (writeFn != null) {
-      tableSpecConfig.put(RemoteTableProvider.WRITE_FN, SerdeUtils.serialize("write function", writeFn));
-    }
-
-    // Serialize the rate limiter if specified
-    if (!tagCreditsMap.isEmpty()) {
-      rateLimiter = new EmbeddedTaggedRateLimiter(tagCreditsMap);
-    }
-
-    if (rateLimiter != null) {
-      tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
-    }
-
-    // Serialize the readCredit and writeCredit functions
-    if (readCreditFn != null) {
-      tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, SerdeUtils.serialize(
-          "read credit function", readCreditFn));
-    }
-
-    if (writeCreditFn != null) {
-      tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, SerdeUtils.serialize(
-          "write credit function", writeCreditFn));
-    }
-
-    if (readRetryPolicy != null) {
-      tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize(
-          "read retry policy", readRetryPolicy));
-    }
-
-    if (writeRetryPolicy != null) {
-      tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize(
-          "write retry policy", writeRetryPolicy));
-    }
-
-    tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
-
-    return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig);
-  }
-
-  /**
-   * Use specified TableReadFunction with remote table and a retry policy.
-   * @param readFn read function instance
-   * @return this table descriptor instance
-   */
-  public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn) {
-    Preconditions.checkNotNull(readFn, "null read function");
-    this.readFn = readFn;
-    return this;
-  }
-
-  /**
-   * Use specified TableWriteFunction with remote table and a retry policy.
-   * @param writeFn write function instance
-   * @return this table descriptor instance
-   */
-  public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn) {
-    Preconditions.checkNotNull(writeFn, "null write function");
-    this.writeFn = writeFn;
-    return this;
-  }
-
-  /**
-   * Use specified TableReadFunction with remote table.
-   * @param readFn read function instance
-   * @param retryPolicy retry policy for the read function
-   * @return this table descriptor instance
-   */
-  public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
-    Preconditions.checkNotNull(readFn, "null read function");
-    Preconditions.checkNotNull(retryPolicy, "null retry policy");
-    this.readFn = readFn;
-    this.readRetryPolicy = retryPolicy;
-    return this;
-  }
-
-  /**
-   * Use specified TableWriteFunction with remote table.
-   * @param writeFn write function instance
-   * @param retryPolicy retry policy for the write function
-   * @return this table descriptor instance
-   */
-  public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
-    Preconditions.checkNotNull(writeFn, "null write function");
-    Preconditions.checkNotNull(retryPolicy, "null retry policy");
-    this.writeFn = writeFn;
-    this.writeRetryPolicy = retryPolicy;
-    return this;
-  }
-
-  /**
-   * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
-   * of credits to be charged from the rate limiter for table read and write operations.
-   * This is an advanced API that provides greater flexibility to throttle each record in the table
-   * with different number of credits. For most common use-cases eg: limit the number of read/write
-   * operations, please instead use the {@link RemoteTableDescriptor#withReadRateLimit(int)} and
-   * {@link RemoteTableDescriptor#withWriteRateLimit(int)}.
-   *
-   * @param rateLimiter rate limiter instance to be used for throttling
-   * @param readCreditFn credit function for rate limiting read operations
-   * @param writeCreditFn credit function for rate limiting write operations
-   * @return this table descriptor instance
-   */
-  public RemoteTableDescriptor<K, V> withRateLimiter(RateLimiter rateLimiter,
-      TableRateLimiter.CreditFunction<K, V> readCreditFn,
-      TableRateLimiter.CreditFunction<K, V> writeCreditFn) {
-    Preconditions.checkNotNull(rateLimiter, "null read rate limiter");
-    this.rateLimiter = rateLimiter;
-    this.readCreditFn = readCreditFn;
-    this.writeCreditFn = writeCreditFn;
-    return this;
-  }
-
-  /**
-   * Specify the rate limit for table read operations. If the read rate limit is set with this method
-   * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
-   * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
-   * and vice versa.
-   * @param creditsPerSec rate limit for read operations; must be positive
-   * @return this table descriptor instance
-   */
-  public RemoteTableDescriptor<K, V> withReadRateLimit(int creditsPerSec) {
-    Preconditions.checkArgument(creditsPerSec > 0, "Max read rate must be a positive number.");
-    tagCreditsMap.put(RL_READ_TAG, creditsPerSec);
-    return this;
-  }
-
-  /**
-   * Specify the rate limit for table write operations. If the write rate limit is set with this method
-   * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
-   * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
-   * and vice versa.
-   * @param creditsPerSec rate limit for write operations; must be positive
-   * @return this table descriptor instance
-   */
-  public RemoteTableDescriptor<K, V> withWriteRateLimit(int creditsPerSec) {
-    Preconditions.checkArgument(creditsPerSec > 0, "Max write rate must be a positive number.");
-    tagCreditsMap.put(RL_WRITE_TAG, creditsPerSec);
-    return this;
-  }
-
-  /**
-   * Specify the size of the thread pool for the executor used to execute
-   * callbacks of CompletableFutures of async Table operations. By default, these
-   * futures are completed (called) by the threads of the native store client. Depending
-   * on the implementation of the native client, it may or may not allow executing long
-   * running operations in the callbacks. This config can be used to execute the callbacks
-   * from a separate executor to decouple from the native client. If configured, this
-   * thread pool is shared by all read and write operations.
-   * @param poolSize max number of threads in the executor for async callbacks
-   * @return this table descriptor instance
-   */
-  public RemoteTableDescriptor<K, V> withAsyncCallbackExecutorPoolSize(int poolSize) {
-    this.asyncCallbackPoolSize = poolSize;
-    return this;
-  }
-
-  @Override
-  protected void validate() {
-    super.validate();
-    Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
-    Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
-        "Only one of rateLimiter instance or read/write limits can be specified");
-    // Assume callback executor pool should have no more than 20 threads
-    Preconditions.checkArgument(asyncCallbackPoolSize <= 20,
-        "too many threads for async callback executor.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
deleted file mode 100644
index 9415e70..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.retry.RetriableReadFunction;
-import org.apache.samza.table.retry.RetriableWriteFunction;
-import org.apache.samza.table.retry.TableRetryPolicy;
-import org.apache.samza.table.utils.BaseTableProvider;
-import org.apache.samza.table.utils.SerdeUtils;
-import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.util.RateLimiter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
-import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG;
-
-
-/**
- * Provide for remote table instances
- */
-public class RemoteTableProvider extends BaseTableProvider {
-
-  static final String READ_FN = "io.read.func";
-  static final String WRITE_FN = "io.write.func";
-  static final String RATE_LIMITER = "io.ratelimiter";
-  static final String READ_CREDIT_FN = "io.read.credit.func";
-  static final String WRITE_CREDIT_FN = "io.write.credit.func";
-  static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
-  static final String READ_RETRY_POLICY = "io.read.retry.policy";
-  static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
-
-  private final boolean readOnly;
-  private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
-
-  /**
-   * Map of tableId -> executor service for async table IO and callbacks. The same executors
-   * are shared by both read/write operations such that tables of the same tableId all share
-   * the set same of executors globally whereas table itself is per-task.
-   */
-  private static Map<String, ExecutorService> tableExecutors = new ConcurrentHashMap<>();
-  private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
-  private static ScheduledExecutorService retryExecutor;
-
-  public RemoteTableProvider(TableSpec tableSpec) {
-    super(tableSpec);
-    this.readOnly = !tableSpec.getConfig().containsKey(WRITE_FN);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public Table getTable() {
-    RemoteReadableTable table;
-    String tableId = tableSpec.getId();
-
-    TableReadFunction readFn = getReadFn();
-    RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
-    if (rateLimiter != null) {
-      rateLimiter.init(this.context);
-    }
-    TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(READ_CREDIT_FN);
-    TableRateLimiter readRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, readCreditFn, RL_READ_TAG);
-
-    TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
-    TableRateLimiter writeRateLimiter = null;
-
-    TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY);
-    TableRetryPolicy writeRetryPolicy = null;
-
-    if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
-      retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
-          Thread thread = new Thread(runnable);
-          thread.setName("table-retry-executor");
-          thread.setDaemon(true);
-          return thread;
-        });
-    }
-
-    if (readRetryPolicy != null) {
-      readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
-    }
-
-    TableWriteFunction writeFn = getWriteFn();
-
-    boolean isRateLimited = readRateLimiter.isRateLimited();
-    if (!readOnly) {
-      writeCreditFn = deserializeObject(WRITE_CREDIT_FN);
-      writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RL_WRITE_TAG);
-      isRateLimited |= writeRateLimiter.isRateLimited();
-      writeRetryPolicy = deserializeObject(WRITE_RETRY_POLICY);
-      if (writeRetryPolicy != null) {
-        writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
-      }
-    }
-
-    // Optional executor for future callback/completion. Shared by both read and write operations.
-    int callbackPoolSize = Integer.parseInt(tableSpec.getConfig().get(ASYNC_CALLBACK_POOL_SIZE));
-    if (callbackPoolSize > 0) {
-      callbackExecutors.computeIfAbsent(tableId, (arg) ->
-          Executors.newFixedThreadPool(callbackPoolSize, (runnable) -> {
-              Thread thread = new Thread(runnable);
-              thread.setName("table-" + tableId + "-async-callback-pool");
-              thread.setDaemon(true);
-              return thread;
-            }));
-    }
-
-    if (isRateLimited) {
-      tableExecutors.computeIfAbsent(tableId, (arg) ->
-          Executors.newSingleThreadExecutor(runnable -> {
-              Thread thread = new Thread(runnable);
-              thread.setName("table-" + tableId + "-async-executor");
-              thread.setDaemon(true);
-              return thread;
-            }));
-    }
-
-    if (readOnly) {
-      table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter,
-          tableExecutors.get(tableId), callbackExecutors.get(tableId));
-    } else {
-      table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter,
-          writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
-    }
-
-    TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
-    if (readRetryPolicy != null) {
-      ((RetriableReadFunction) readFn).setMetrics(metricsUtil);
-    }
-    if (writeRetryPolicy != null) {
-      ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
-    }
-
-    table.init(this.context);
-    tables.add(table);
-    return table;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void close() {
-    tables.forEach(t -> t.close());
-    tableExecutors.values().forEach(e -> e.shutdown());
-    callbackExecutors.values().forEach(e -> e.shutdown());
-  }
-
-  private <T> T deserializeObject(String key) {
-    String entry = tableSpec.getConfig().getOrDefault(key, "");
-    if (entry.isEmpty()) {
-      return null;
-    }
-    return SerdeUtils.deserialize(key, entry);
-  }
-
-  private TableReadFunction<?, ?> getReadFn() {
-    TableReadFunction<?, ?> readFn = deserializeObject(READ_FN);
-    if (readFn != null) {
-      readFn.init(this.context);
-    }
-    return readFn;
-  }
-
-  private TableWriteFunction<?, ?> getWriteFn() {
-    TableWriteFunction<?, ?> writeFn = deserializeObject(WRITE_FN);
-    if (writeFn != null) {
-      writeFn.init(this.context);
-    }
-    return writeFn;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
deleted file mode 100644
index 0eb88fd..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * Factory class for a remote table provider
- */
-public class RemoteTableProviderFactory implements TableProviderFactory {
-  @Override
-  public TableProvider getTableProvider(TableSpec tableSpec) {
-    Preconditions.checkNotNull(tableSpec, "null table spec");
-    return new RemoteTableProvider(tableSpec);
-  }
-}