You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/21 01:22:35 UTC

[2/3] samza git commit: SAMZA-1998: Table API refactoring

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
index bf032a2..1e44fa5 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
@@ -22,8 +22,6 @@ import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.table.TableSpec;
-
 
 /**
  * The spec for operator that writes a stream to a table by extracting keys and values
@@ -35,21 +33,21 @@ import org.apache.samza.table.TableSpec;
 @InterfaceStability.Unstable
 public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Void> {
 
-  private final TableSpec tableSpec;
+  private final String tableId;
 
   /**
    * Constructor for a {@link SendToTableOperatorSpec}.
    *
-   * @param tableSpec  the table spec of the table written to
+   * @param tableId  the Id of the table written to
    * @param opId  the unique ID for this operator
    */
-  SendToTableOperatorSpec(TableSpec tableSpec, String opId) {
+  SendToTableOperatorSpec(String tableId, String opId) {
     super(OpCode.SEND_TO, opId);
-    this.tableSpec = tableSpec;
+    this.tableId = tableId;
   }
 
-  public TableSpec getTableSpec() {
-    return tableSpec;
+  public String getTableId() {
+    return tableId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
index 1849c64..a4f6068 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
@@ -22,8 +22,6 @@ import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.table.TableSpec;
-
 
 /**
  * The spec for stream-table join operator that retrieves a record from the table using key
@@ -36,24 +34,24 @@ import org.apache.samza.table.TableSpec;
 @InterfaceStability.Unstable
 public class StreamTableJoinOperatorSpec<K, M, R, JM> extends OperatorSpec<M, JM> {
 
-  private final TableSpec tableSpec;
+  private final String tableId;
   private final StreamTableJoinFunction<K, M, R, JM> joinFn;
 
   /**
    * Constructor for {@link StreamTableJoinOperatorSpec}.
    *
-   * @param tableSpec  the table spec for the table on the right side of the join
+   * @param tableId  the Id of the table on the right side of the join
    * @param joinFn  the user-defined join function to get join keys and results
    * @param opId  the unique ID for this operator
    */
-  StreamTableJoinOperatorSpec(TableSpec tableSpec, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) {
+  StreamTableJoinOperatorSpec(String tableId, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) {
     super(OpCode.JOIN, opId);
-    this.tableSpec = tableSpec;
+    this.tableId = tableId;
     this.joinFn = joinFn;
   }
 
-  public TableSpec getTableSpec() {
-    return tableSpec;
+  public String getTableId() {
+    return tableId;
   }
 
   public StreamTableJoinFunction<K, M, R, JM> getJoinFn() {

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
index 6fa040b..bb83e9c 100644
--- a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
@@ -18,10 +18,7 @@
  */
 package org.apache.samza.table;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.context.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,38 +31,31 @@ abstract public class BaseTableProvider implements TableProvider {
 
   final protected Logger logger = LoggerFactory.getLogger(getClass());
 
-  final protected TableSpec tableSpec;
+  final protected String tableId;
 
-  protected Context context;
+  // Job config
+  final protected Config config;
 
-  public BaseTableProvider(TableSpec tableSpec) {
-    this.tableSpec = tableSpec;
-  }
+  protected Context context;
 
   /**
-   * {@inheritDoc}
+   * Construct the table provider using table Id and job configuration
+   * @param tableId Id of the table
+   * @param config job configuration
    */
+  public BaseTableProvider(String tableId, Config config) {
+    this.tableId = tableId;
+    this.config = config;
+  }
+
   @Override
   public void init(Context context) {
     this.context = context;
+    logger.info("Initializing table provider for table " + tableId);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
-  public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
-    Map<String, String> tableConfig = new HashMap<>();
-
-    // Insert table_id prefix to config entries
-    tableSpec.getConfig().forEach((k, v) -> {
-        String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
-        tableConfig.put(realKey, v);
-      });
-
-    logger.info("Generated configuration for table " + tableSpec.getId());
-
-    return tableConfig;
+  public void close() {
+    logger.info("Closing table provider for table " + tableId);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
index 55db637..05cfacd 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
@@ -19,80 +19,26 @@
 
 package org.apache.samza.table;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.operators.TableImpl;
-import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Helper class to generate table configs.
  */
 public class TableConfigGenerator {
-  private static final Logger LOG = LoggerFactory.getLogger(TableConfigGenerator.class);
-
-  /**
-   * Generate table configurations given a list of table descriptors
-   * @param config the job configuration
-   * @param tableDescriptors the list of tableDescriptors
-   * @return configuration for the tables
-   */
-  static public Map<String, String> generateConfigsForTableDescs(Config config, List<TableDescriptor> tableDescriptors) {
-    return generateConfigsForTableSpecs(config, getTableSpecs(tableDescriptors));
-  }
-
-  /**
-   * Generate table configurations given a list of table specs
-   * @param config the job configuration
-   * @param tableSpecs the list of tableSpecs
-   * @return configuration for the tables
-   */
-  static public Map<String, String> generateConfigsForTableSpecs(Config config, List<TableSpec> tableSpecs) {
-    Map<String, String> tableConfigs = new HashMap<>();
 
-    tableSpecs.forEach(tableSpec -> {
-        // Add table provider factory config
-        tableConfigs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()),
-            tableSpec.getTableProviderFactoryClassName());
-
-        // Generate additional configuration
-        TableProviderFactory tableProviderFactory =
-            Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
-        TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec);
-        tableConfigs.putAll(tableProvider.generateConfig(config, tableConfigs));
-      });
+  private static final Logger LOG = LoggerFactory.getLogger(TableConfigGenerator.class);
 
-    LOG.info("TableConfigGenerator has generated configs {}", tableConfigs);
-    return tableConfigs;
+  static public Map<String, String> generate(Config jobConfig, List<TableDescriptor> tableDescriptors) {
+    Map<String, String> tableConfig = new HashMap<>();
+    tableDescriptors.forEach(tableDescriptor -> tableConfig.putAll(tableDescriptor.toConfig(jobConfig)));
+    LOG.info("TableConfigGenerator has generated configs {}", tableConfig);
+    return tableConfig;
   }
 
-  /**
-   * Get list of table specs given a list of table descriptors.
-   * @param tableDescs the list of tableDescriptors
-   * @return list of tableSpecs
-   */
-  static public List<TableSpec> getTableSpecs(List<TableDescriptor> tableDescs) {
-    Map<TableSpec, TableImpl> tableSpecs = new LinkedHashMap<>();
-
-    tableDescs.forEach(tableDesc -> {
-        TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec();
-
-        if (tableSpecs.containsKey(tableSpec)) {
-          throw new IllegalStateException(
-              String.format("getTable() invoked multiple times with the same tableId: %s", tableDesc.getTableId()));
-        }
-        tableSpecs.put(tableSpec, new TableImpl(tableSpec));
-      });
-    return new ArrayList<>(tableSpecs.keySet());
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index d7b15a4..cae7548 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -23,8 +23,6 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.context.Context;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,12 +33,12 @@ import java.util.Map;
 
 /**
  * A {@link TableManager} manages tables within a Samza task. For each table, it maintains
- * the {@link TableSpec}, the {@link TableProvider} and the {@link Table} instance.
+ * the {@link TableProvider} and the {@link Table} instance.
  * It is used at execution for {@link org.apache.samza.container.TaskInstance} to retrieve
  * table instances for read/write operations.
  *
- * A {@link TableManager} is constructed from job configuration, the {@link TableSpec}
- * and {@link TableProvider} are constructed by processing the job configuration
+ * A {@link TableManager} is constructed from job configuration, the
+ * {@link TableProvider} are constructed by processing the job configuration
  * during initialization. The {@link Table} is constructed when {@link #getTable(String)}
  * is called and cached.
  *
@@ -54,8 +52,7 @@ import java.util.Map;
  */
 public class TableManager {
 
-  static public class TableCtx {
-    private TableSpec tableSpec;
+  static class TableCtx {
     private TableProvider tableProvider;
     private Table table;
   }
@@ -70,27 +67,13 @@ public class TableManager {
   /**
    * Construct a table manager instance
    * @param config job configuration
-   * @param serdes Serde instances for tables
    */
-  public TableManager(Config config, Map<String, Serde<Object>> serdes) {
+  public TableManager(Config config) {
     new JavaTableConfig(config).getTableIds().forEach(tableId -> {
-
-        // Construct the table provider
-        String tableProviderFactory = config.get(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableId));
-
-        // Construct the KVSerde
-        JavaTableConfig tableConfig = new JavaTableConfig(config);
-        KVSerde serde = KVSerde.of(
-            serdes.get(tableConfig.getKeySerde(tableId)),
-            serdes.get(tableConfig.getValueSerde(tableId)));
-
-        TableSpec tableSpec = new TableSpec(tableId, serde, tableProviderFactory,
-            config.subset(String.format(JavaTableConfig.TABLE_ID_PREFIX, tableId) + "."));
-
-        addTable(tableSpec);
-
-        logger.info("Added table " + tableSpec.getId());
+        addTable(tableId, config);
+        logger.debug("Added table " + tableId);
       });
+    logger.info(String.format("Added %d tables", tableContexts.size()));
   }
 
   /**
@@ -102,20 +85,17 @@ public class TableManager {
     initialized = true;
   }
 
-  /**
-   * Add a table to the table manager
-   * @param tableSpec the table spec
-   */
-  private void addTable(TableSpec tableSpec) {
-    if (tableContexts.containsKey(tableSpec.getId())) {
-      throw new SamzaException("Table " + tableSpec.getId() + " already exists");
+  private void addTable(String tableId, Config config) {
+    if (tableContexts.containsKey(tableId)) {
+      throw new SamzaException("Table " + tableId + " already exists");
     }
-    TableCtx ctx = new TableCtx();
+    JavaTableConfig tableConfig = new JavaTableConfig(config);
+    String providerFactoryClassName = tableConfig.getTableProviderFactory(tableId);
     TableProviderFactory tableProviderFactory =
-        Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
-    ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec);
-    ctx.tableSpec = tableSpec;
-    tableContexts.put(tableSpec.getId(), ctx);
+        Util.getObj(providerFactoryClassName, TableProviderFactory.class);
+    TableCtx ctx = new TableCtx();
+    ctx.tableProvider = tableProviderFactory.getTableProvider(tableId, config);
+    tableContexts.put(tableId, ctx);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index 32d2bed..703a6ff 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -85,9 +85,6 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
     this.isWriteAround = isWriteAround;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void init(Context context) {
     readMetrics = new DefaultTableReadMetrics(context, this, tableId);

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
index 1a400a4..f78347a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
@@ -23,10 +23,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.BaseTableProvider;
@@ -41,16 +42,17 @@ public class CachingTableProvider extends BaseTableProvider {
   // Store the cache instances created by default
   private final List<ReadWriteTable> defaultCaches = new ArrayList<>();
 
-  public CachingTableProvider(TableSpec tableSpec) {
-    super(tableSpec);
+  public CachingTableProvider(String tableId, Config config) {
+    super(tableId, config);
   }
 
   @Override
   public Table getTable() {
-    String realTableId = tableSpec.getConfig().get(CachingTableDescriptor.REAL_TABLE_ID);
+    JavaTableConfig tableConfig = new JavaTableConfig(config);
+    String realTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.REAL_TABLE_ID);
     ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
 
-    String cacheTableId = tableSpec.getConfig().get(CachingTableDescriptor.CACHE_TABLE_ID);
+    String cacheTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.CACHE_TABLE_ID);
     ReadWriteTable cache;
 
     if (cacheTableId != null) {
@@ -60,21 +62,23 @@ public class CachingTableProvider extends BaseTableProvider {
       defaultCaches.add(cache);
     }
 
-    boolean isWriteAround = Boolean.parseBoolean(tableSpec.getConfig().get(CachingTableDescriptor.WRITE_AROUND));
-    CachingTable cachingTable = new CachingTable(tableSpec.getId(), table, cache, isWriteAround);
+    boolean isWriteAround = Boolean.parseBoolean(tableConfig.getForTable(tableId, CachingTableDescriptor.WRITE_AROUND));
+    CachingTable cachingTable = new CachingTable(tableId, table, cache, isWriteAround);
     cachingTable.init(this.context);
     return cachingTable;
   }
 
   @Override
   public void close() {
+    super.close();
     defaultCaches.forEach(c -> c.close());
   }
 
   private ReadWriteTable createDefaultCacheTable(String tableId) {
-    long readTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(CachingTableDescriptor.READ_TTL_MS, "-1"));
-    long writeTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(CachingTableDescriptor.WRITE_TTL_MS, "-1"));
-    long cacheSize = Long.parseLong(tableSpec.getConfig().getOrDefault(CachingTableDescriptor.CACHE_SIZE, "-1"));
+    JavaTableConfig tableConfig = new JavaTableConfig(config);
+    long readTtlMs = Long.parseLong(tableConfig.getForTable(tableId, CachingTableDescriptor.READ_TTL_MS, "-1"));
+    long writeTtlMs = Long.parseLong(tableConfig.getForTable(tableId, CachingTableDescriptor.WRITE_TTL_MS, "-1"));
+    long cacheSize = Long.parseLong(tableConfig.getForTable(tableId, CachingTableDescriptor.CACHE_SIZE, "-1"));
 
     CacheBuilder cacheBuilder = CacheBuilder.newBuilder();
     if (readTtlMs != -1) {

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
index 2ac3694..f421538 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
@@ -19,16 +19,17 @@
 
 package org.apache.samza.table.caching;
 
+import org.apache.samza.config.Config;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
 
 /**
  * Table provider factory for {@link org.apache.samza.table.caching.CachingTable}.
  */
 public class CachingTableProviderFactory implements TableProviderFactory {
   @Override
-  public TableProvider getTableProvider(TableSpec tableSpec) {
-    return new CachingTableProvider(tableSpec);
+  public TableProvider getTableProvider(String tableId, Config config) {
+    return new CachingTableProvider(tableId, config);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
index 5f77ee4..391f068 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
@@ -48,9 +48,6 @@ public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> {
     this.cache = cache;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void init(Context context) {
     TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
index 5c9b2af..21c78cc 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
@@ -22,8 +22,9 @@ package org.apache.samza.table.caching.guava;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.BaseTableProvider;
 import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
 import org.apache.samza.table.utils.SerdeUtils;
@@ -38,15 +39,16 @@ public class GuavaCacheTableProvider extends BaseTableProvider {
 
   private List<GuavaCacheTable> guavaTables = new ArrayList<>();
 
-  public GuavaCacheTableProvider(TableSpec tableSpec) {
-    super(tableSpec);
+  public GuavaCacheTableProvider(String tableId, Config config) {
+    super(tableId, config);
   }
 
   @Override
   public Table getTable() {
+    JavaTableConfig tableConfig = new JavaTableConfig(config);
     Cache guavaCache = SerdeUtils.deserialize(GuavaCacheTableDescriptor.GUAVA_CACHE,
-        tableSpec.getConfig().get(GuavaCacheTableDescriptor.GUAVA_CACHE));
-    GuavaCacheTable table = new GuavaCacheTable(tableSpec.getId(), guavaCache);
+        tableConfig.getForTable(tableId, GuavaCacheTableDescriptor.GUAVA_CACHE));
+    GuavaCacheTable table = new GuavaCacheTable(tableId, guavaCache);
     table.init(this.context);
     guavaTables.add(table);
     return table;
@@ -54,6 +56,7 @@ public class GuavaCacheTableProvider extends BaseTableProvider {
 
   @Override
   public void close() {
+    super.close();
     guavaTables.forEach(t -> t.close());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
index ac060c4..d5323f5 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
@@ -19,16 +19,16 @@
 
 package org.apache.samza.table.caching.guava;
 
+import org.apache.samza.config.Config;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
 
 /**
  * Table provider factory for {@link org.apache.samza.table.caching.guava.GuavaCacheTable}.
  */
 public class GuavaCacheTableProviderFactory implements TableProviderFactory {
   @Override
-  public TableProvider getTableProvider(TableSpec tableSpec) {
-    return new GuavaCacheTableProvider(tableSpec);
+  public TableProvider getTableProvider(String tableId, Config config) {
+    return new GuavaCacheTableProvider(tableId, config);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index 52bdc71..60ac4b7 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -56,9 +56,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     this.writeRateLimiter = writeRateLimiter;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void init(Context context) {
     super.init(context);
@@ -67,9 +64,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void put(K key, V value) {
     try {
@@ -79,9 +73,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public CompletableFuture<Void> putAsync(K key, V value) {
     Preconditions.checkNotNull(key);
@@ -96,9 +87,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
           });
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void putAll(List<Entry<K, V>> entries) {
     try {
@@ -108,9 +96,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
     Preconditions.checkNotNull(records);
@@ -139,9 +124,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
           });
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void delete(K key) {
     try {
@@ -151,9 +133,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public CompletableFuture<Void> deleteAsync(K key) {
     Preconditions.checkNotNull(key);
@@ -164,9 +143,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
           });
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void deleteAll(List<K> keys) {
     try {
@@ -176,9 +152,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
     Preconditions.checkNotNull(keys);
@@ -193,9 +166,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
           });
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void flush() {
     try {
@@ -210,9 +180,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void close() {
     writeFn.close();

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index f0d781a..1b6bfea 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -103,9 +103,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
     this.logger = LoggerFactory.getLogger(getClass().getName() + "-" + tableId);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void init(Context context) {
     readMetrics = new DefaultTableReadMetrics(context, this, tableId);
@@ -113,9 +110,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
     readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public V get(K key) {
     try {
@@ -141,9 +135,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
           });
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public Map<K, V> getAll(List<K> keys) {
     readMetrics.numGetAlls.inc();
@@ -297,9 +288,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
     return ioFuture;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void close() {
     readFn.close();

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 4b17e23..8f0b2fd 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -19,8 +19,9 @@
 
 package org.apache.samza.table.remote;
 
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.retry.RetriableReadFunction;
 import org.apache.samza.table.retry.RetriableWriteFunction;
@@ -38,7 +39,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
-
 /**
  * Provide for remote table instances
  */
@@ -57,31 +57,30 @@ public class RemoteTableProvider extends BaseTableProvider {
   private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
   private static ScheduledExecutorService retryExecutor;
 
-  public RemoteTableProvider(TableSpec tableSpec) {
-    super(tableSpec);
-    this.readOnly = !tableSpec.getConfig().containsKey(RemoteTableDescriptor.WRITE_FN);
+  public RemoteTableProvider(String tableId, Config config) {
+    super(tableId, config);
+    JavaTableConfig tableConfig = new JavaTableConfig(config);
+    this.readOnly = tableConfig.getForTable(tableId, RemoteTableDescriptor.WRITE_FN) == null;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public Table getTable() {
     RemoteReadableTable table;
-    String tableId = tableSpec.getId();
 
-    TableReadFunction readFn = getReadFn();
-    RateLimiter rateLimiter = deserializeObject(RemoteTableDescriptor.RATE_LIMITER);
+    JavaTableConfig tableConfig = new JavaTableConfig(config);
+
+    TableReadFunction readFn = getReadFn(tableConfig);
+    RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER);
     if (rateLimiter != null) {
       rateLimiter.init(this.context);
     }
-    TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(RemoteTableDescriptor.READ_CREDIT_FN);
-    TableRateLimiter readRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
+    TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
+    TableRateLimiter readRateLimiter = new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
 
     TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
     TableRateLimiter writeRateLimiter = null;
 
-    TableRetryPolicy readRetryPolicy = deserializeObject(RemoteTableDescriptor.READ_RETRY_POLICY);
+    TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY);
     TableRetryPolicy writeRetryPolicy = null;
 
     if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
@@ -97,21 +96,21 @@ public class RemoteTableProvider extends BaseTableProvider {
       readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
     }
 
-    TableWriteFunction writeFn = getWriteFn();
+    TableWriteFunction writeFn = getWriteFn(tableConfig);
 
     boolean isRateLimited = readRateLimiter.isRateLimited();
     if (!readOnly) {
-      writeCreditFn = deserializeObject(RemoteTableDescriptor.WRITE_CREDIT_FN);
-      writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
+      writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
+      writeRateLimiter = new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
       isRateLimited |= writeRateLimiter.isRateLimited();
-      writeRetryPolicy = deserializeObject(RemoteTableDescriptor.WRITE_RETRY_POLICY);
+      writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY);
       if (writeRetryPolicy != null) {
         writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
       }
     }
 
     // Optional executor for future callback/completion. Shared by both read and write operations.
-    int callbackPoolSize = Integer.parseInt(tableSpec.getConfig().get(RemoteTableDescriptor.ASYNC_CALLBACK_POOL_SIZE));
+    int callbackPoolSize = Integer.parseInt(tableConfig.getForTable(tableId, RemoteTableDescriptor.ASYNC_CALLBACK_POOL_SIZE, "-1"));
     if (callbackPoolSize > 0) {
       callbackExecutors.computeIfAbsent(tableId, (arg) ->
           Executors.newFixedThreadPool(callbackPoolSize, (runnable) -> {
@@ -133,10 +132,10 @@ public class RemoteTableProvider extends BaseTableProvider {
     }
 
     if (readOnly) {
-      table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter,
+      table = new RemoteReadableTable(tableId, readFn, readRateLimiter,
           tableExecutors.get(tableId), callbackExecutors.get(tableId));
     } else {
-      table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter,
+      table = new RemoteReadWriteTable(tableId, readFn, writeFn, readRateLimiter,
           writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
     }
 
@@ -153,34 +152,32 @@ public class RemoteTableProvider extends BaseTableProvider {
     return table;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public void close() {
+    super.close();
     tables.forEach(t -> t.close());
     tableExecutors.values().forEach(e -> e.shutdown());
     callbackExecutors.values().forEach(e -> e.shutdown());
   }
 
-  private <T> T deserializeObject(String key) {
-    String entry = tableSpec.getConfig().getOrDefault(key, "");
+  private <T> T deserializeObject(JavaTableConfig tableConfig, String key) {
+    String entry = tableConfig.getForTable(tableId, key, "");
     if (entry.isEmpty()) {
       return null;
     }
     return SerdeUtils.deserialize(key, entry);
   }
 
-  private TableReadFunction<?, ?> getReadFn() {
-    TableReadFunction<?, ?> readFn = deserializeObject(RemoteTableDescriptor.READ_FN);
+  private TableReadFunction<?, ?> getReadFn(JavaTableConfig tableConfig) {
+    TableReadFunction<?, ?> readFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_FN);
     if (readFn != null) {
       readFn.init(this.context);
     }
     return readFn;
   }
 
-  private TableWriteFunction<?, ?> getWriteFn() {
-    TableWriteFunction<?, ?> writeFn = deserializeObject(RemoteTableDescriptor.WRITE_FN);
+  private TableWriteFunction<?, ?> getWriteFn(JavaTableConfig tableConfig) {
+    TableWriteFunction<?, ?> writeFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_FN);
     if (writeFn != null) {
       writeFn.init(this.context);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
index 0eb88fd..723288d 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
@@ -19,20 +19,16 @@
 
 package org.apache.samza.table.remote;
 
+import org.apache.samza.config.Config;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
-
-import com.google.common.base.Preconditions;
-
 
 /**
  * Factory class for a remote table provider
  */
 public class RemoteTableProviderFactory implements TableProviderFactory {
   @Override
-  public TableProvider getTableProvider(TableSpec tableSpec) {
-    Preconditions.checkNotNull(tableSpec, "null table spec");
-    return new RemoteTableProvider(tableSpec);
+  public TableProvider getTableProvider(String tableId, Config config) {
+    return new RemoteTableProvider(tableId, config);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 87ab86b..7b4410e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -637,7 +637,7 @@ object SamzaContainer extends Logging {
           new SystemClock)
       }
 
-      val tableManager = new TableManager(config, serdes.asJava)
+      val tableManager = new TableManager(config)
 
       info("Got table manager")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
index 3b680fc..141d31b 100644
--- a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
@@ -54,7 +54,6 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.junit.Test;
 
@@ -514,16 +513,14 @@ public class TestStreamApplicationDescriptorImpl {
   public void testGetTable() throws Exception {
     Config mockConfig = getConfig();
 
+    String tableId = "t1";
     BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
-    TableSpec testTableSpec = new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>());
-    when(mockTableDescriptor.getTableSpec()).thenReturn(testTableSpec);
-    when(mockTableDescriptor.getTableId()).thenReturn(testTableSpec.getId());
-    when(mockTableDescriptor.getSerde()).thenReturn(testTableSpec.getSerde());
+    when(mockTableDescriptor.getTableId()).thenReturn(tableId);
     AtomicReference<TableImpl> table = new AtomicReference<>();
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
         table.set((TableImpl) appDesc.getTable(mockTableDescriptor));
       }, mockConfig);
-    assertEquals(testTableSpec.getId(), table.get().getTableSpec().getId());
+    assertEquals(tableId, table.get().getTableId());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/application/descriptors/TestTaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestTaskApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestTaskApplicationDescriptorImpl.java
index 14b480d..c870e80 100644
--- a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestTaskApplicationDescriptorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestTaskApplicationDescriptorImpl.java
@@ -33,7 +33,6 @@ import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
-import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.task.TaskFactory;
 import org.junit.Before;
 import org.junit.Test;
@@ -73,8 +72,6 @@ public class TestTaskApplicationDescriptorImpl {
       BaseTableDescriptor mock2 = mock(BaseTableDescriptor.class);
       when(mock1.getTableId()).thenReturn("test-table1");
       when(mock2.getTableId()).thenReturn("test-table2");
-      when(mock1.getSerde()).thenReturn(mock(KVSerde.class));
-      when(mock2.getSerde()).thenReturn(mock(KVSerde.class));
       this.add(mock1);
       this.add(mock2);
     } };

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 299d631..7908764 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -41,12 +41,12 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.system.descriptors.GenericOutputDescriptor;
 import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.OutputDescriptor;
 import org.apache.samza.system.descriptors.GenericSystemDescriptor;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -65,7 +65,7 @@ import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.TestLocalTableDescriptor;
 import org.apache.samza.testUtils.StreamTestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -264,7 +264,8 @@ public class TestExecutionPlanner {
         MessageStream<KV<Object, Object>> messageStream3 = appDesc.getInputStream(input3Descriptor);
         OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
 
-        TableDescriptor tableDescriptor = new TestTableDescriptor("table-id");
+        TableDescriptor tableDescriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor(
+            "table-id", new KVSerde(new StringSerde(), new StringSerde()));
         Table table = appDesc.getTable(tableDescriptor);
 
         messageStream2
@@ -352,7 +353,8 @@ public class TestExecutionPlanner {
         MessageStream<KV<Object, Object>> messageStream2 = appDesc.getInputStream(input2Descriptor);
         OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
 
-        TableDescriptor tableDescriptor = new TestTableDescriptor("table-id");
+        TableDescriptor tableDescriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor(
+          "table-id", new KVSerde(new StringSerde(), new StringSerde()));
         Table table = appDesc.getTable(tableDescriptor);
 
         messageStream1.sendTo(table);
@@ -378,8 +380,10 @@ public class TestExecutionPlanner {
         MessageStream<KV<Object, Object>> messageStream2 = appDesc.getInputStream(input2Descriptor);
         OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
 
-        TableDescriptor tableDescriptor = new TestTableDescriptor("table-id", Arrays.asList("input1"),
-            (message, store) -> Collections.emptyList());
+        TableDescriptor tableDescriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor(
+          "table-id", new KVSerde(new StringSerde(), new StringSerde()))
+            .withSideInputs(Arrays.asList("input1"))
+            .withSideInputsProcessor(mock(SideInputsProcessor.class));
         Table table = appDesc.getTable(tableDescriptor);
 
         messageStream2
@@ -404,8 +408,10 @@ public class TestExecutionPlanner {
         MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor);
         OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
 
-        TableDescriptor tableDescriptor = new TestTableDescriptor("table-id", Arrays.asList("input2"),
-            (message, store) -> Collections.emptyList());
+        TableDescriptor tableDescriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor(
+          "table-id", new KVSerde(new StringSerde(), new StringSerde()))
+            .withSideInputs(Arrays.asList("input2"))
+            .withSideInputsProcessor(mock(SideInputsProcessor.class));
         Table table = appDesc.getTable(tableDescriptor);
 
         messageStream1
@@ -822,26 +828,4 @@ public class TestExecutionPlanner {
 
     }
   }
-
-  private static class TestTableDescriptor extends BaseTableDescriptor implements TableDescriptor {
-    private final List<String> sideInputs;
-    private final SideInputsProcessor sideInputsProcessor;
-
-    public TestTableDescriptor(String tableId) {
-      this(tableId, Collections.emptyList(), null);
-    }
-
-    public TestTableDescriptor(String tableId, List<String> sideInputs, SideInputsProcessor sideInputsProcessor) {
-      super(tableId);
-      this.sideInputs = sideInputs;
-      this.sideInputsProcessor = sideInputsProcessor;
-    }
-
-    @Override
-    public TableSpec getTableSpec() {
-      validate();
-      return new TableSpec(tableId, serde, "dummyTableProviderFactoryClassName",
-          Collections.emptyMap(), sideInputs, sideInputsProcessor);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index b47014d..b620901 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -48,6 +48,8 @@ import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.table.descriptors.TestLocalTableDescriptor;
 import org.apache.samza.testUtils.StreamTestUtils;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.hamcrest.Matchers;
@@ -76,6 +78,8 @@ public class TestJobGraphJsonGenerator {
   private GenericInputDescriptor<KV<String, Object>> input1Descriptor;
   private GenericInputDescriptor<KV<String, Object>> input2Descriptor;
   private GenericOutputDescriptor<KV<String, Object>> outputDescriptor;
+  private TableDescriptor<String, Object, ?> table1Descriptor;
+  private TableDescriptor<String, Object, ?> table2Descriptor;
 
   @Before
   public void setUp() {
@@ -93,6 +97,8 @@ public class TestJobGraphJsonGenerator {
     input1Descriptor = inputSystemDescriptor.getInputDescriptor("input1", defaultSerde);
     input2Descriptor = inputSystemDescriptor.getInputDescriptor("input2", defaultSerde);
     outputDescriptor = outputSystemDescriptor.getOutputDescriptor("output", defaultSerde);
+    table1Descriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor("table1", defaultSerde);
+    table2Descriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor("table2", defaultSerde);
 
     Map<String, String> configs = new HashMap<>();
     configs.put(JobConfig.JOB_NAME(), "jobName");
@@ -117,6 +123,11 @@ public class TestJobGraphJsonGenerator {
     when(mockJobNode.getJobName()).thenReturn("jobName");
     when(mockJobNode.getJobId()).thenReturn("jobId");
     when(mockJobNode.getJobNameAndId()).thenReturn(JobNode.createJobNameAndId("jobName", "jobId"));
+
+    Map<String, TableDescriptor> tables = new HashMap<>();
+    tables.put(table1Descriptor.getTableId(), table1Descriptor);
+    tables.put(table2Descriptor.getTableId(), table2Descriptor);
+    when(mockJobNode.getTables()).thenReturn(tables);
   }
 
   @Test
@@ -305,6 +316,8 @@ public class TestJobGraphJsonGenerator {
     when(mockJobGraph.getInputStreams()).thenReturn(inEdges);
     when(mockJobGraph.getOutputStreams()).thenReturn(outEdges);
     when(mockJobGraph.getIntermediateStreamEdges()).thenReturn(intermediateEdges);
+    Set<TableDescriptor> tables = new HashSet<>(mockJobNode.getTables().values());
+    when(mockJobGraph.getTables()).thenReturn(tables);
     when(mockJobGraph.getJobNodes()).thenReturn(Collections.singletonList(mockJobNode));
     String graphJson = jsonGenerator.toJson(mockJobGraph);
     ObjectMapper objectMapper = new ObjectMapper();
@@ -317,6 +330,8 @@ public class TestJobGraphJsonGenerator {
     assertThat(jsonObject.sinkStreams.keySet(), Matchers.containsInAnyOrder(outStreamIds.toArray()));
     Set<String> intStreamIds = intermediateEdges.stream().map(stream -> stream.getStreamSpec().getId()).collect(Collectors.toSet());
     assertThat(jsonObject.intermediateStreams.keySet(), Matchers.containsInAnyOrder(intStreamIds.toArray()));
+    Set<String> tableIds = tables.stream().map(t -> t.getTableId()).collect(Collectors.toSet());
+    assertThat(jsonObject.tables.keySet(), Matchers.containsInAnyOrder(tableIds.toArray()));
     JobGraphJsonGenerator.JobNodeJson expectedNodeJson = new JobGraphJsonGenerator.JobNodeJson();
     expectedNodeJson.jobId = mockJobNode.getJobId();
     expectedNodeJson.jobName = mockJobNode.getJobName();

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index 86d339e..1c4abc6 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.samza.execution;
 
 import com.google.common.base.Joiner;
+import java.util.Arrays;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
@@ -28,38 +29,31 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.context.Context;
+import org.apache.samza.storage.SideInputsProcessor;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.operators.KV;
+import org.apache.samza.table.descriptors.TestLocalTableDescriptor.MockLocalTableDescriptor;
+import org.apache.samza.table.descriptors.TestLocalTableDescriptor.MockTableProviderFactory;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 
 /**
@@ -161,17 +155,10 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
     mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig);
     // add table to the RepartitionJoinStreamApplication
     GenericInputDescriptor<KV<String, Object>> sideInput1 = inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde);
-    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
-    TableSpec mockTableSpec = mock(TableSpec.class);
-    when(mockTableSpec.getId()).thenReturn("testTable");
-    when(mockTableSpec.getSerde()).thenReturn((KVSerde) defaultSerde);
-    when(mockTableSpec.getTableProviderFactoryClassName()).thenReturn(MockTableProviderFactory.class.getName());
-    List<String> sideInputs = new ArrayList<>();
-    sideInputs.add(sideInput1.getStreamId());
-    when(mockTableSpec.getSideInputs()).thenReturn(sideInputs);
-    when(mockTableDescriptor.getTableId()).thenReturn("testTable");
-    when(mockTableDescriptor.getTableSpec()).thenReturn(mockTableSpec);
-    when(mockTableDescriptor.getSerde()).thenReturn(defaultSerde);
+    BaseTableDescriptor mockTableDescriptor = new MockLocalTableDescriptor("testTable", defaultSerde)
+        .withSideInputs(Arrays.asList(sideInput1.getStreamId()))
+        .withSideInputsProcessor(mock(SideInputsProcessor.class, withSettings().serializable()))
+        .withConfig("mock.table.provider.config", "mock.config.value");
     // add side input and terminate at table in the appplication
     mockStreamAppDesc.getInputStream(sideInput1).sendTo(mockStreamAppDesc.getTable(mockTableDescriptor));
     StreamEdge sideInputEdge = new StreamEdge(new StreamSpec(sideInput1.getStreamId(), "sideInput1",
@@ -198,17 +185,10 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
   public void testTaskApplicationWithTableAndSideInput() {
     // add table to the RepartitionJoinStreamApplication
     GenericInputDescriptor<KV<String, Object>> sideInput1 = inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde);
-    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
-    TableSpec mockTableSpec = mock(TableSpec.class);
-    when(mockTableSpec.getId()).thenReturn("testTable");
-    when(mockTableSpec.getSerde()).thenReturn((KVSerde) defaultSerde);
-    when(mockTableSpec.getTableProviderFactoryClassName()).thenReturn(MockTableProviderFactory.class.getName());
-    List<String> sideInputs = new ArrayList<>();
-    sideInputs.add(sideInput1.getStreamId());
-    when(mockTableSpec.getSideInputs()).thenReturn(sideInputs);
-    when(mockTableDescriptor.getTableId()).thenReturn("testTable");
-    when(mockTableDescriptor.getTableSpec()).thenReturn(mockTableSpec);
-    when(mockTableDescriptor.getSerde()).thenReturn(defaultSerde);
+    BaseTableDescriptor mockTableDescriptor = new MockLocalTableDescriptor("testTable", defaultSerde)
+        .withSideInputs(Arrays.asList(sideInput1.getStreamId()))
+        .withSideInputsProcessor(mock(SideInputsProcessor.class, withSettings().serializable()))
+        .withConfig("mock.table.provider.config", "mock.config.value");
     StreamEdge sideInputEdge = new StreamEdge(new StreamSpec(sideInput1.getStreamId(), "sideInput1",
         inputSystemDescriptor.getSystemName()), false, false, mockConfig);
     // need to put the sideInput related stream configuration to the original config
@@ -260,9 +240,7 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
       TableDescriptor tableDescriptor) {
     Config tableConfig = jobConfig.subset(String.format("tables.%s.", tableDescriptor.getTableId()));
     assertEquals(MockTableProviderFactory.class.getName(), tableConfig.get("provider.factory"));
-    MockTableProvider mockTableProvider =
-        (MockTableProvider) new MockTableProviderFactory().getTableProvider(((BaseTableDescriptor) tableDescriptor).getTableSpec());
-    assertEquals(mockTableProvider.configMap.get("mock.table.provider.config"), jobConfig.get("mock.table.provider.config"));
+    assertEquals("mock.config.value", jobConfig.get("mock.table.provider.config"));
     validateTableSerdeConfigure(tableDescriptor.getTableId(), jobConfig, deserializedSerdes);
   }
 
@@ -316,9 +294,9 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
   }
 
   private void validateTableSerdeConfigure(String tableId, Config config, Map<String, Serde> deserializedSerdes) {
-    Config streamConfig = config.subset(String.format("tables.%s.", tableId));
+    Config streamConfig = config.subset(String.format("stores.%s.", tableId));
     String keySerdeName = streamConfig.get("key.serde");
-    String valueSerdeName = streamConfig.get("value.serde");
+    String valueSerdeName = streamConfig.get("msg.serde");
     assertTrue(String.format("Serialized serdes should contain %s key serde", tableId), deserializedSerdes.containsKey(keySerdeName));
     assertTrue(String.format("Serialized %s key serde should be a StringSerde", tableId), keySerdeName.startsWith(StringSerde.class.getSimpleName()));
     assertTrue(String.format("Serialized serdes should contain %s value serde", tableId), deserializedSerdes.containsKey(valueSerdeName));
@@ -387,46 +365,7 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
     assertEquals("3600000", joinStoreConfig.get("rocksdb.ttl.ms"));
   }
 
-  private static class MockTableProvider implements TableProvider {
-    private final Map<String, String> configMap;
-
-    MockTableProvider(Map<String, String> configMap) {
-      this.configMap = configMap;
-    }
-
-    @Override
-    public void init(Context context) {
-
-    }
-
-    @Override
-    public Table getTable() {
-      return null;
-    }
-
-    @Override
-    public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
-      return configMap;
-    }
-
-    @Override
-    public void close() {
-
-    }
-  }
-
-  public static class MockTableProviderFactory implements TableProviderFactory {
-
-    @Override
-    public TableProvider getTableProvider(TableSpec tableSpec) {
-      Map<String, String> configMap = new HashMap<>();
-      configMap.put("mock.table.provider.config", "mock.config.value");
-      return new MockTableProvider(configMap);
-    }
-  }
-
   public static class MockConfigRewriter implements ConfigRewriter {
-
     @Override
     public Config rewrite(String name, Config config) {
       Map<String, String> configMap = new HashMap<>(config);
@@ -434,4 +373,5 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
       return new MapConfig(configMap);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index ce9435f..ab7cf5f 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -52,7 +52,6 @@ import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -292,8 +291,7 @@ public class TestMessageStreamImpl {
     OperatorSpec inputOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> source = new MessageStreamImpl<>(mockGraph, inputOpSpec);
 
-    TableSpec tableSpec = new TableSpec();
-    TableImpl table = new TableImpl(tableSpec);
+    TableImpl table = new TableImpl("t1");
 
     source.sendTo(table);
 
@@ -305,7 +303,7 @@ public class TestMessageStreamImpl {
     SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) registeredOpSpec;
 
     assertEquals(OpCode.SEND_TO, sendToTableOperatorSpec.getOpCode());
-    assertEquals(tableSpec, sendToTableOperatorSpec.getTableSpec());
+    assertEquals(table.getTableId(), sendToTableOperatorSpec.getTableId());
   }
 
   @Test
@@ -316,8 +314,7 @@ public class TestMessageStreamImpl {
     OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph, rightInputOpSpec);
 
-    TableSpec tableSpec = new TableSpec();
-    TableImpl table = new TableImpl(tableSpec);
+    TableImpl table = new TableImpl("t1");
 
     source2.sendTo(table);
 
@@ -333,7 +330,7 @@ public class TestMessageStreamImpl {
     StreamTableJoinOperatorSpec joinOpSpec = (StreamTableJoinOperatorSpec) leftRegisteredOpSpec;
     assertEquals(OpCode.JOIN, joinOpSpec.getOpCode());
     assertEquals(mockJoinFn, joinOpSpec.getJoinFn());
-    assertEquals(tableSpec, joinOpSpec.getTableSpec());
+    assertEquals(table.getTableId(), joinOpSpec.getTableId());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
index 9083495..4112c8b 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -27,7 +27,6 @@ import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.TableSpec;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
@@ -43,11 +42,9 @@ public class TestStreamTableJoinOperatorImpl {
   public void testHandleMessage() {
 
     String tableId = "t1";
-    TableSpec tableSpec = mock(TableSpec.class);
-    when(tableSpec.getId()).thenReturn(tableId);
 
     StreamTableJoinOperatorSpec mockJoinOpSpec = mock(StreamTableJoinOperatorSpec.class);
-    when(mockJoinOpSpec.getTableSpec()).thenReturn(tableSpec);
+    when(mockJoinOpSpec.getTableId()).thenReturn(tableId);
     when(mockJoinOpSpec.getJoinFn()).thenReturn(
         new StreamTableJoinFunction<String, KV<String, String>, KV<String, String>, String>() {
           @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
index 454a661..b93544f 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
@@ -31,7 +31,6 @@ import org.apache.samza.operators.TableImpl;
 import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.SerializableSerde;
-import org.apache.samza.table.TableSpec;
 
 import static org.junit.Assert.*;
 
@@ -90,17 +89,9 @@ public class OperatorSpecTestUtils {
     assertNotEquals(scheduledFn, scheduledFn1);
   }
 
-  private static void assertClonedTables(Map<TableSpec, TableImpl> originalTables, Map<TableSpec, TableImpl> clonedTables) {
-    assertEquals(originalTables.size(), clonedTables.size());
-    assertEquals(originalTables.keySet(), clonedTables.keySet());
-    Iterator<TableImpl> oIter = originalTables.values().iterator();
-    Iterator<TableImpl> nIter = clonedTables.values().iterator();
-    oIter.forEachRemaining(oTable -> assertClonedTableImpl(oTable, nIter.next()));
-  }
-
   private static void assertClonedTableImpl(TableImpl oTable, TableImpl nTable) {
     assertNotEquals(oTable, nTable);
-    assertEquals(oTable.getTableSpec(), nTable.getTableSpec());
+    assertEquals(oTable.getTableId(), nTable.getTableId());
   }
 
   private static void assertClonedOutputs(Map<String, OutputStreamImpl> originalOutputs,

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
index 6e91e2a..fd908ca 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
@@ -20,9 +20,7 @@ package org.apache.samza.operators.spec;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.data.TestMessageEnvelope;
@@ -36,19 +34,13 @@ import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 
 /**
@@ -305,32 +297,28 @@ public class TestOperatorSpec {
   public void testStreamTableJoinOperatorSpec() {
     StreamTableJoinFunction<String, Object, Object, TestOutputMessageEnvelope> joinFn = new TestStreamTableJoinFunction();
 
-    TableSpec tableSpec = new TableSpec("table-0", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>()), "my.table.provider.class",
-        new MapConfig(new HashMap<String, String>() { { this.put("config1", "value1"); this.put("config2", "value2"); } }));
+    String tableId = "t1";
 
     StreamTableJoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope> joinOperatorSpec =
-        new StreamTableJoinOperatorSpec<>(tableSpec, joinFn, "join-3");
+        new StreamTableJoinOperatorSpec<>(tableId, joinFn, "join-3");
 
     StreamTableJoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope> joinOpSpecCopy =
         (StreamTableJoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(joinOperatorSpec);
     assertNotEquals(joinOpSpecCopy, joinOperatorSpec);
     assertEquals(joinOpSpecCopy.getOpId(), joinOperatorSpec.getOpId());
-    assertTrue(joinOpSpecCopy.getTableSpec() != joinOperatorSpec.getTableSpec());
-    assertEquals(joinOpSpecCopy.getTableSpec().getId(), joinOperatorSpec.getTableSpec().getId());
-    assertEquals(joinOpSpecCopy.getTableSpec().getTableProviderFactoryClassName(), joinOperatorSpec.getTableSpec().getTableProviderFactoryClassName());
+    assertEquals(joinOpSpecCopy.getTableId(), joinOperatorSpec.getTableId());
   }
 
   @Test
   public void testSendToTableOperatorSpec() {
-    TableSpec tableSpec = new TableSpec("table-0", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>()), "my.table.provider.class",
-        new MapConfig(new HashMap<String, String>() { { this.put("config1", "value1"); this.put("config2", "value2"); } }));
+    String tableId = "t1";
     SendToTableOperatorSpec<String, Integer> sendOpSpec =
-        new SendToTableOperatorSpec<>(tableSpec, "output-1");
+        new SendToTableOperatorSpec<>(tableId, "output-1");
     SendToTableOperatorSpec<String, Integer> sendToCopy = (SendToTableOperatorSpec<String, Integer>) OperatorSpecTestUtils
         .copyOpSpec(sendOpSpec);
     assertNotEquals(sendToCopy, sendOpSpec);
     assertEquals(sendToCopy.getOpId(), sendOpSpec.getOpId());
-    assertTrue(sendToCopy.getTableSpec() != sendOpSpec.getTableSpec() && sendToCopy.getTableSpec().equals(sendOpSpec.getTableSpec()));
+    assertTrue(sendToCopy.getTableId().equals(sendOpSpec.getTableId()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index 0952a87..1bbf3af 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -20,6 +20,7 @@ package org.apache.samza.table;
 
 import junit.framework.Assert;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
@@ -53,7 +54,7 @@ public class TestTableManager {
     static TableProvider tableProvider;
 
     @Override
-    public TableProvider getTableProvider(TableSpec tableSpec) {
+    public TableProvider getTableProvider(String tableId, Config config) {
       table = mock(ReadableTable.class);
       tableProvider = mock(TableProvider.class);
       when(tableProvider.getTable()).thenReturn(table);
@@ -79,47 +80,16 @@ public class TestTableManager {
     doTestInit(map);
   }
 
-  @Test(expected = Exception.class)
-  public void testInitFailsWithoutKeySerde() {
-    Map<String, String> map = new HashMap<>();
-    map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), DummyTableProviderFactory.class.getName());
-    addValueSerde(map);
-    doTestInit(map);
-  }
-
-  @Test(expected = Exception.class)
-  public void testInitFailsWithoutValueSerde() {
-    Map<String, String> map = new HashMap<>();
-    map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), DummyTableProviderFactory.class.getName());
-    addValueSerde(map);
-    doTestInit(map);
-  }
-
   @Test(expected = IllegalStateException.class)
-  public void testInitFailsWithoutInitializingLocalTables() {
-    TableManager tableManager = new TableManager(new MapConfig(new HashMap<>()), new HashMap<>());
+  public void testInitFailsWithoutInitializingLocalStores() {
+    TableManager tableManager = new TableManager(new MapConfig(new HashMap<>()));
     tableManager.getTable("dummy");
   }
 
   private void doTestInit(Map<String, String> map) {
     Map<String, StorageEngine> storageEngines = new HashMap<>();
     storageEngines.put(TABLE_ID, mock(StorageEngine.class));
-
-    Map<String, Serde<Object>> serdeMap = new HashMap<>();
-    SerializableSerde<Serde> serializableSerde = new SerializableSerde();
-    map.keySet().stream()
-        .filter(k -> k.endsWith(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX()))
-        .forEach(k -> {
-            String serdeName = k
-                .replace(String.format(SerializerConfig.SERIALIZER_PREFIX(), ""), "")
-                .replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), "");
-            String serializedSerde = map.get(k);
-            byte[] bytes = Base64.getDecoder().decode(serializedSerde);
-            Serde serde = serializableSerde.fromBytes(bytes);
-            serdeMap.put(serdeName, serde);
-          });
-
-    TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
+    TableManager tableManager = new TableManager(new MapConfig(map));
     tableManager.init(new MockContext());
 
     for (int i = 0; i < 2; i++) {
@@ -131,13 +101,7 @@ public class TestTableManager {
 
     Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tableContexts");
     TableManager.TableCtx ctx = ctxMap.get(TABLE_ID);
-
-    TableSpec tableSpec = getFieldValue(ctx, "tableSpec");
-    Assert.assertEquals(TABLE_ID, tableSpec.getId());
-    Assert.assertEquals(DummyTableProviderFactory.class.getName(), tableSpec.getTableProviderFactoryClassName());
-    Assert.assertEquals(IntegerSerde.class, tableSpec.getSerde().getKeySerde().getClass());
-    Assert.assertEquals(StringSerde.class, tableSpec.getSerde().getValueSerde().getClass());
-    Assert.assertEquals("xyz", tableSpec.getConfig().get("some.config"));
+    Assert.assertEquals(TABLE_ID, ctxMap.keySet().iterator().next());
 
     TableProvider tableProvider = getFieldValue(ctx, "tableProvider");
     Assert.assertNotNull(tableProvider);
@@ -147,14 +111,14 @@ public class TestTableManager {
     String serdeId = "key-serde";
     map.put(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeId),
         serializeSerde(new IntegerSerde()));
-    map.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, TABLE_ID), serdeId);
+    map.put(String.format(JavaTableConfig.STORE_KEY_SERDE, TABLE_ID), serdeId);
   }
 
   private void addValueSerde(Map<String, String> map) {
     String serdeId = "value-serde";
     map.put(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeId),
             serializeSerde(new StringSerde("UTF-8")));
-    map.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, TABLE_ID), serdeId);
+    map.put(String.format(JavaTableConfig.STORE_MSG_SERDE, TABLE_ID), serdeId);
   }
 
   private String serializeSerde(Serde serde) {

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index e29f6e6..36ec46f 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -22,6 +22,9 @@ package org.apache.samza.table.caching;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.MockContext;
 import org.apache.samza.metrics.Counter;
@@ -33,7 +36,6 @@ import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
@@ -65,7 +67,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-
 public class TestCachingTable {
   @Test
   public void testSerializeSimple() {
@@ -74,10 +75,11 @@ public class TestCachingTable {
 
   @Test
   public void testSerializeWithCacheInstance() {
-    GuavaCacheTableDescriptor guavaTableDesc = new GuavaCacheTableDescriptor("guavaCacheId");
-    guavaTableDesc.withCache(CacheBuilder.newBuilder().build());
-    TableSpec spec = guavaTableDesc.getTableSpec();
-    Assert.assertTrue(spec.getConfig().containsKey(GuavaCacheTableDescriptor.GUAVA_CACHE));
+    String tableId = "guavaCacheId";
+    GuavaCacheTableDescriptor guavaTableDesc = new GuavaCacheTableDescriptor(tableId)
+        .withCache(CacheBuilder.newBuilder().build());
+    Map<String, String> tableConfig = guavaTableDesc.toConfig(new MapConfig());
+    assertExists(GuavaCacheTableDescriptor.GUAVA_CACHE, tableId, tableConfig);
     doTestSerialize(guavaTableDesc);
   }
 
@@ -85,29 +87,28 @@ public class TestCachingTable {
     CachingTableDescriptor desc;
     TableDescriptor table = createDummyTableDescriptor("2");
     if (cache == null) {
-      desc = new CachingTableDescriptor("1", table);
-      desc.withReadTtl(Duration.ofMinutes(3));
-      desc.withWriteTtl(Duration.ofMinutes(3));
-      desc.withCacheSize(1000);
+      desc = new CachingTableDescriptor("1", table)
+          .withReadTtl(Duration.ofMinutes(3))
+          .withWriteTtl(Duration.ofMinutes(4))
+          .withCacheSize(1000);
     } else {
       desc = new CachingTableDescriptor("1", table, cache);
     }
 
     desc.withWriteAround();
 
-    TableSpec spec = desc.getTableSpec();
-    Assert.assertTrue(spec.getConfig().containsKey(CachingTableDescriptor.REAL_TABLE_ID));
+    Map<String, String> tableConfig = desc.toConfig(new MapConfig());
+
+    assertEquals("2", CachingTableDescriptor.REAL_TABLE_ID, "1", tableConfig);
 
     if (cache == null) {
-      Assert.assertTrue(spec.getConfig().containsKey(CachingTableDescriptor.READ_TTL_MS));
-      Assert.assertTrue(spec.getConfig().containsKey(CachingTableDescriptor.WRITE_TTL_MS));
+      assertEquals("180000", CachingTableDescriptor.READ_TTL_MS, "1", tableConfig);
+      assertEquals("240000", CachingTableDescriptor.WRITE_TTL_MS, "1", tableConfig);
     } else {
-      Assert.assertTrue(spec.getConfig().containsKey(CachingTableDescriptor.CACHE_TABLE_ID));
+      assertEquals(cache.getTableId(), CachingTableDescriptor.CACHE_TABLE_ID, "1", tableConfig);
     }
 
-    Assert.assertEquals("true", spec.getConfig().get(CachingTableDescriptor.WRITE_AROUND));
-
-    desc.validate();
+    assertEquals("true", CachingTableDescriptor.WRITE_AROUND, "1", tableConfig);
   }
 
   private static Pair<ReadWriteTable<String, String>, Map<String, String>> getMockCache() {
@@ -157,7 +158,9 @@ public class TestCachingTable {
     if (isWriteAround) {
       desc.withWriteAround();
     }
-    CachingTableProvider tableProvider = new CachingTableProvider(desc.getTableSpec());
+
+    Config config = new MapConfig(desc.toConfig(new MapConfig()));
+    CachingTableProvider tableProvider = new CachingTableProvider(desc.getTableId(), config);
 
     Context context = new MockContext();
     final ReadWriteTable cacheTable = getMockCache().getLeft();
@@ -364,8 +367,16 @@ public class TestCachingTable {
   private TableDescriptor createDummyTableDescriptor(String tableId) {
     BaseTableDescriptor tableDescriptor = mock(BaseTableDescriptor.class);
     when(tableDescriptor.getTableId()).thenReturn(tableId);
-    when(tableDescriptor.getTableSpec()).thenReturn(
-        new TableSpec(tableId, null, null, new HashMap<>()));
     return tableDescriptor;
   }
+
+  private void assertExists(String key, String tableId, Map<String, String> config) {
+    String realKey = JavaTableConfig.buildKey(tableId, key);
+    Assert.assertTrue(config.containsKey(realKey));
+  }
+
+  private void assertEquals(String expectedValue, String key, String tableId, Map<String, String> config) {
+    String realKey = JavaTableConfig.buildKey(tableId, key);
+    Assert.assertEquals(expectedValue, config.get(realKey));
+  }
 }
\ No newline at end of file