You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/21 01:22:36 UTC

[3/3] samza git commit: SAMZA-1998: Table API refactoring

SAMZA-1998: Table API refactoring

Table API refactoring
     - Removed TableSpec
     - Consolidated configuration generation for tables to table descriptors
     - Refactored constructor so that only local table would require serde's
     - Removed table provider for RocksDB- and in-memory tables, and added LocalTableProvider
     - Updates to unit tests
     - Various refactoring

Author: Wei Song <ws...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #807 from weisong44/SAMZA-1998


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5f7a22c3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5f7a22c3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5f7a22c3

Branch: refs/heads/master
Commit: 5f7a22c3bb991352cba58bc83d0df01a0ba38835
Parents: cce2b6f
Author: Wei Song <ws...@linkedin.com>
Authored: Tue Nov 20 17:22:18 2018 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Tue Nov 20 17:22:18 2018 -0800

----------------------------------------------------------------------
 .../apache/samza/config/JavaTableConfig.java    | 118 +++++++++++++
 .../org/apache/samza/table/TableProvider.java   |  15 --
 .../samza/table/TableProviderFactory.java       |  10 +-
 .../java/org/apache/samza/table/TableSpec.java  | 172 -------------------
 .../table/descriptors/BaseTableDescriptor.java  |  68 +++-----
 .../descriptors/CachingTableDescriptor.java     |  70 ++++----
 .../descriptors/GuavaCacheTableDescriptor.java  |  32 ++--
 .../descriptors/HybridTableDescriptor.java      |   3 +-
 .../table/descriptors/LocalTableDescriptor.java | 105 ++++++++---
 .../descriptors/RemoteTableDescriptor.java      | 122 ++++++-------
 .../table/descriptors/TableDescriptor.java      |  17 ++
 .../samza/table/TestBaseTableDescriptor.java    |  73 ++++++++
 .../StreamApplicationDescriptorImpl.java        |  10 +-
 .../TaskApplicationDescriptorImpl.java          |   8 +-
 .../apache/samza/config/JavaTableConfig.java    |  88 ----------
 .../samza/execution/ExecutionPlanner.java       |  35 ++--
 .../org/apache/samza/execution/JobGraph.java    |  14 +-
 .../samza/execution/JobGraphJsonGenerator.java  |  41 ++---
 .../org/apache/samza/execution/JobNode.java     |  10 +-
 .../JobNodeConfigurationGenerator.java          |  38 ++--
 .../execution/OperatorSpecGraphAnalyzer.java    |  29 ++--
 .../samza/operators/MessageStreamImpl.java      |   6 +-
 .../org/apache/samza/operators/TableImpl.java   |  18 +-
 .../operators/impl/SendToTableOperatorImpl.java |   2 +-
 .../impl/StreamTableJoinOperatorImpl.java       |   2 +-
 .../samza/operators/spec/OperatorSpecs.java     |  14 +-
 .../operators/spec/SendToTableOperatorSpec.java |  14 +-
 .../spec/StreamTableJoinOperatorSpec.java       |  14 +-
 .../apache/samza/table/BaseTableProvider.java   |  40 ++---
 .../samza/table/TableConfigGenerator.java       |  66 +------
 .../org/apache/samza/table/TableManager.java    |  54 ++----
 .../samza/table/caching/CachingTable.java       |   3 -
 .../table/caching/CachingTableProvider.java     |  24 +--
 .../caching/CachingTableProviderFactory.java    |   7 +-
 .../table/caching/guava/GuavaCacheTable.java    |   3 -
 .../caching/guava/GuavaCacheTableProvider.java  |  13 +-
 .../guava/GuavaCacheTableProviderFactory.java   |   6 +-
 .../table/remote/RemoteReadWriteTable.java      |  33 ----
 .../samza/table/remote/RemoteReadableTable.java |  12 --
 .../samza/table/remote/RemoteTableProvider.java |  57 +++---
 .../remote/RemoteTableProviderFactory.java      |  10 +-
 .../apache/samza/container/SamzaContainer.scala |   2 +-
 .../TestStreamApplicationDescriptorImpl.java    |   9 +-
 .../TestTaskApplicationDescriptorImpl.java      |   3 -
 .../samza/execution/TestExecutionPlanner.java   |  44 ++---
 .../execution/TestJobGraphJsonGenerator.java    |  15 ++
 .../TestJobNodeConfigurationGenerator.java      |  94 ++--------
 .../samza/operators/TestMessageStreamImpl.java  |  11 +-
 .../impl/TestStreamTableJoinOperatorImpl.java   |   5 +-
 .../operators/spec/OperatorSpecTestUtils.java   |  11 +-
 .../samza/operators/spec/TestOperatorSpec.java  |  26 +--
 .../apache/samza/table/TestTableManager.java    |  52 +-----
 .../samza/table/caching/TestCachingTable.java   |  53 +++---
 .../descriptors/TestLocalTableDescriptor.java   | 162 +++++++++++++++++
 .../descriptors/TestRemoteTableDescriptor.java  |  88 ++++++----
 .../descriptors/InMemoryTableDescriptor.java    |  38 ++--
 .../descriptors/InMemoryTableProvider.java      |  71 --------
 .../InMemoryTableProviderFactory.java           |  33 ----
 .../inmemory/TestInMemoryTableDescriptor.java   |  65 +++++++
 .../TestInMemoryTableDescriptor.java            |  55 ------
 .../descriptors/TestInMemoryTableProvider.java  |  67 --------
 .../kv/descriptors/RocksDbTableDescriptor.java  |  64 +++----
 .../kv/descriptors/RocksDbTableProvider.java    |  75 --------
 .../RocksDbTableProviderFactory.java            |  31 ----
 .../descriptors/TestRocksDbTableDescriptor.java |  97 ++++++-----
 .../descriptors/TestRocksDbTableProvider.java   |  68 --------
 .../samza/storage/kv/LocalReadWriteTable.java   |   3 -
 .../samza/storage/kv/LocalReadableTable.java    |   3 -
 .../samza/storage/kv/LocalTableProvider.java    | 103 +----------
 .../storage/kv/LocalTableProviderFactory.java   |  31 ++++
 .../kv/descriptors/TestLocalTableProvider.java  | 103 +----------
 .../sql/translator/TranslatorTestBase.java      |  11 +-
 .../samza/example/TaskApplicationExample.java   |   5 +-
 73 files changed, 1185 insertions(+), 1789 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
new file mode 100644
index 0000000..c4381a1
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A helper class for handling table configuration
+ */
+public class JavaTableConfig extends MapConfig {
+
+  // Prefix
+  public static final String TABLES_PREFIX = "tables.";
+  public static final String TABLE_ID_PREFIX = TABLES_PREFIX + "%s";
+
+  // Suffix
+  public static final String TABLE_PROVIDER_FACTORY_SUFFIX = ".provider.factory";
+
+  // Config keys
+  public static final String TABLE_PROVIDER_FACTORY = String.format("%s.provider.factory", TABLE_ID_PREFIX);
+  public static final String STORE_KEY_SERDE = "stores.%s.key.serde";
+  public static final String STORE_MSG_SERDE = "stores.%s.msg.serde";
+
+  public JavaTableConfig(Map<String, String> config) {
+    super(config);
+  }
+
+  /**
+   * Get Id's of all tables
+   * @return list of table Id's
+   */
+  public List<String> getTableIds() {
+    Config subConfig = subset(TABLES_PREFIX, true);
+    Set<String> tableNames = subConfig.keySet().stream()
+        .filter(k -> k.endsWith(TABLE_PROVIDER_FACTORY_SUFFIX))
+        .map(k -> k.replace(TABLE_PROVIDER_FACTORY_SUFFIX, ""))
+        .collect(Collectors.toSet());
+    return new LinkedList<>(tableNames);
+  }
+
+  /**
+   * Get the {@link org.apache.samza.table.TableProviderFactory} class name for a table
+   * @param tableId Id of the table
+   * @return the {@link org.apache.samza.table.TableProviderFactory} class name
+   */
+  public String getTableProviderFactory(String tableId) {
+    return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null);
+  }
+
+  /**
+   * Get registry keys of key serde for this table
+   * @param tableId Id of the table
+   * @return serde retistry key
+   */
+  public String getKeySerde(String tableId) {
+    return get(String.format(STORE_KEY_SERDE, tableId), null);
+  }
+
+  /**
+   * Get registry keys of value serde for this table
+   * @param tableId Id of the table
+   * @return serde retistry key
+   */
+  public String getValueSerde(String tableId) {
+    return get(String.format(STORE_MSG_SERDE, tableId), null);
+  }
+
+  /**
+   * Get table config value for a key
+   * @param tableId Id of the table
+   * @param key Key for the config item
+   * @param defaultValue default value if absent in config
+   * @return config value for the key
+   */
+  public String getForTable(String tableId, String key, String defaultValue) {
+    return get(buildKey(tableId, key), defaultValue);
+  }
+
+  /**
+   * Get table config value for a key
+   * @param tableId Id of the table
+   * @param key Key for the config item
+   * @return config value for the key
+   */
+  public String getForTable(String tableId, String key) {
+    return getForTable(tableId, key, null);
+  }
+
+  /**
+   * Build complete config key for a config item
+   * @param tableId Id of the table
+   * @param key Key for the config item
+   * @return the complete config key
+   */
+  static public String buildKey(String tableId, String key) {
+    return String.format(TABLE_ID_PREFIX + ".%s", tableId, key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
index 350324c..2dec989 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -18,9 +18,7 @@
  */
 package org.apache.samza.table;
 
-import java.util.Map;
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
 import org.apache.samza.context.Context;
 
 /**
@@ -42,19 +40,6 @@ public interface TableProvider {
   Table getTable();
 
   /**
-   * Generate any configuration for this table, the generated configuration
-   * is used by Samza container to construct this table and any components
-   * necessary. Instead of manipulating the input parameters, this method
-   * should return the generated configuration.
-   *
-   * @param jobConfig the job config
-   * @param generatedConfig config generated by earlier processing, but has
-   *                        not yet been merged to job config
-   * @return configuration for this table
-   */
-  Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig);
-
-  /**
    * Shutdown the underlying table
    */
   void close();

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
index 1bb0196..86214a5 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
@@ -19,6 +19,7 @@
 package org.apache.samza.table;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
 
 
 /**
@@ -27,9 +28,10 @@ import org.apache.samza.annotation.InterfaceStability;
 @InterfaceStability.Unstable
 public interface TableProviderFactory {
   /**
-   * Constructs an instances of the table provider based on a given table spec
-   * @param tableSpec the table spec
-   * @return the table provider
+   * Construct a table provider based on job configuration
+   * @param tableId Id of the table
+   * @param config Job configuration
+   * @return the constructed table provider
    */
-  TableProvider getTableProvider(TableSpec tableSpec);
+  TableProvider getTableProvider(String tableId, Config config);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
deleted file mode 100644
index 2883a93..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.storage.SideInputsProcessor;
-
-
-/**
- * TableSpec is a blueprint for creating, validating, or simply describing a table in the runtime environment.
- *
- * It is typically created indirectly by constructing an instance of
- * {@link org.apache.samza.table.descriptors.TableDescriptor}, and then invoke
- * <code>BaseTableDescriptor.getTableSpec()</code>.
- *
- * It has specific attributes for common behaviors that Samza uses.
- *
- * It has the table provider factory, which provides the actual table implementation.
- *
- * It also includes a map of configurations which may be implementation-specific.
- *
- * It is immutable by design.
- */
-@InterfaceStability.Unstable
-public class TableSpec implements Serializable {
-
-  private final String id;
-  private final String tableProviderFactoryClassName;
-
-  /**
-   * The following fields are serialized by the ExecutionPlanner when generating the configs for a table, and deserialized
-   * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis
-   */
-  private transient final KVSerde serde;
-  private transient final List<String> sideInputs;
-  private transient final SideInputsProcessor sideInputsProcessor;
-  private transient final Map<String, String> config = new HashMap<>();
-
-  /**
-   * Default constructor
-   */
-  public TableSpec() {
-    this.id = null;
-    this.serde = null;
-    this.tableProviderFactoryClassName = null;
-    this.sideInputs = null;
-    this.sideInputsProcessor = null;
-  }
-
-  /**
-   * Constructs a {@link TableSpec}
-   *
-   * @param tableId Id of the table
-   * @param tableProviderFactoryClassName table provider factory
-   * @param serde the serde
-   * @param config implementation specific configuration
-   */
-  public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, Map<String, String> config) {
-    this(tableId, serde, tableProviderFactoryClassName, config, Collections.emptyList(), null);
-  }
-
-  /**
-   * Constructs a {@link TableSpec}
-   *
-   * @param tableId Id of the table
-   * @param tableProviderFactoryClassName table provider factory
-   * @param serde the serde
-   * @param config implementation specific configuration
-   * @param sideInputs list of side inputs for the table
-   * @param sideInputsProcessor side input processor for the table
-   */
-  public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, Map<String, String> config,
-      List<String> sideInputs, SideInputsProcessor sideInputsProcessor) {
-    this.id = tableId;
-    this.serde = serde;
-    this.tableProviderFactoryClassName = tableProviderFactoryClassName;
-    this.config.putAll(config);
-    this.sideInputs = sideInputs;
-    this.sideInputsProcessor = sideInputsProcessor;
-  }
-
-  /**
-   * Get the Id of the table
-   * @return Id of the table
-   */
-  public String getId() {
-    return id;
-  }
-
-  /**
-   * Get the serde
-   * @param <K> the type of the key
-   * @param <V> the type of the value
-   * @return the key serde
-   */
-  public <K, V> KVSerde<K, V> getSerde() {
-    return serde;
-  }
-
-  /**
-   * Get the class name of the table provider factory
-   * @return class name of the table provider factory
-   */
-  public String getTableProviderFactoryClassName() {
-    return tableProviderFactoryClassName;
-  }
-
-  /**
-   * Get implementation configuration for the table
-   * @return configuration for the table
-   */
-  public Map<String, String> getConfig() {
-    return Collections.unmodifiableMap(config);
-  }
-
-  /**
-   * Get the list of side inputs for the table.
-   *
-   * @return a {@link List} of side input streams
-   */
-  public List<String> getSideInputs() {
-    return sideInputs;
-  }
-
-  /**
-   * Get the {@link SideInputsProcessor} associated with the table.
-   *
-   * @return a {@link SideInputsProcessor}
-   */
-  public SideInputsProcessor getSideInputsProcessor() {
-    return sideInputsProcessor;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || !getClass().equals(o.getClass())) {
-      return false;
-    }
-    return id.equals(((TableSpec) o).id);
-  }
-
-  @Override
-  public int hashCode() {
-    return id.hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
index 246216b..d660276 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
@@ -19,13 +19,13 @@
 
 package org.apache.samza.table.descriptors;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.table.TableSpec;
-
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
 
 /**
  * Base class for all table descriptor implementations.
@@ -34,13 +34,12 @@ import org.apache.samza.table.TableSpec;
  * @param <V> the type of the value in this table
  * @param <D> the type of the concrete table descriptor
  */
+@InterfaceStability.Unstable
 abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
     implements TableDescriptor<K, V, D> {
 
   protected final String tableId;
 
-  protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde());
-
   protected final Map<String, String> config = new HashMap<>();
 
   /**
@@ -51,60 +50,49 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
     this.tableId = tableId;
   }
 
-  /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   * @param serde the serde for key and value
-   */
-  protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
-    this.tableId = tableId;
-    this.serde = serde;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public D withConfig(String key, String value) {
     config.put(key, value);
     return (D) this;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public String getTableId() {
     return tableId;
   }
 
-  /**
-   * Get the serde assigned to this {@link TableDescriptor}
-   *
-   * @return {@link KVSerde} used by this table
-   */
-  public KVSerde<K, V> getSerde() {
-    return serde;
+  @Override
+  public Map<String, String> toConfig(Config jobConfig) {
+
+    validate();
+
+    Map<String, String> tableConfig = new HashMap<>(config);
+    tableConfig.put(
+        String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableId),
+        getProviderFactoryClassName());
+
+    return Collections.unmodifiableMap(tableConfig);
   }
 
   /**
-   * Generate config for {@link TableSpec}; this method is used internally.
-   * @param tableSpecConfig configuration for the {@link TableSpec}
+   * Return the fully qualified class name of the {@link org.apache.samza.table.TableProviderFactory}
+   * @return class name of the {@link org.apache.samza.table.TableProviderFactory}
    */
-  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
-    tableSpecConfig.putAll(config);
-  }
+  abstract public String getProviderFactoryClassName();
 
   /**
    * Validate that this table descriptor is constructed properly; this method is used internally.
    */
-  protected void validate() {
-  }
+  abstract protected void validate();
 
   /**
-   * Create a {@link TableSpec} from this table descriptor; this method is used internally.
-   *
-   * @return the {@link TableSpec}
+   * Helper method to add a config item to table configuration
+   * @param key key of the config item
+   * @param value value of the config item
+   * @param tableConfig table configuration
    */
-  abstract public TableSpec getTableSpec();
+  protected void addTableConfig(String key, String value, Map<String, String> tableConfig) {
+    tableConfig.put(JavaTableConfig.buildKey(tableId, key), value);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
index d6248c6..2ca2d44 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
@@ -21,16 +21,15 @@ package org.apache.samza.table.descriptors;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.config.Config;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-
 /**
  * Table descriptor for a caching table.
  * @param <K> type of the key in the cache
@@ -81,6 +80,10 @@ public class CachingTableDescriptor<K, V> extends HybridTableDescriptor<K, V, Ca
     this.cache = cache;
   }
 
+  /**
+   * Retrieve user-defined table descriptors contained in this table
+   * @return table descriptors
+   */
   @Override
   public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors() {
     return cache != null
@@ -88,33 +91,6 @@ public class CachingTableDescriptor<K, V> extends HybridTableDescriptor<K, V, Ca
         : Arrays.asList(table);
   }
 
-  @Override
-  public TableSpec getTableSpec() {
-    validate();
-
-    Map<String, String> tableSpecConfig = new HashMap<>();
-    generateTableSpecConfig(tableSpecConfig);
-
-    if (cache != null) {
-      tableSpecConfig.put(CACHE_TABLE_ID, ((BaseTableDescriptor) cache).getTableSpec().getId());
-    } else {
-      if (readTtl != null) {
-        tableSpecConfig.put(READ_TTL_MS, String.valueOf(readTtl.toMillis()));
-      }
-      if (writeTtl != null) {
-        tableSpecConfig.put(WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()));
-      }
-      if (cacheSize > 0) {
-        tableSpecConfig.put(CACHE_SIZE, String.valueOf(cacheSize));
-      }
-    }
-
-    tableSpecConfig.put(REAL_TABLE_ID, ((BaseTableDescriptor) table).getTableSpec().getId());
-    tableSpecConfig.put(WRITE_AROUND, String.valueOf(isWriteAround));
-
-    return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
-  }
-
   /**
    * Specify the TTL for each read access, ie. record is expired after
    * the TTL duration since last read access of each key.
@@ -159,9 +135,37 @@ public class CachingTableDescriptor<K, V> extends HybridTableDescriptor<K, V, Ca
   }
 
   @Override
-  @VisibleForTesting
-  public void validate() {
-    super.validate();
+  public String getProviderFactoryClassName() {
+    return PROVIDER_FACTORY_CLASS_NAME;
+  }
+
+  @Override
+  public Map<String, String> toConfig(Config jobConfig) {
+
+    Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
+
+    if (cache != null) {
+      addTableConfig(CACHE_TABLE_ID, cache.getTableId(), tableConfig);
+    } else {
+      if (readTtl != null) {
+        addTableConfig(READ_TTL_MS, String.valueOf(readTtl.toMillis()), tableConfig);
+      }
+      if (writeTtl != null) {
+        addTableConfig(WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()), tableConfig);
+      }
+      if (cacheSize > 0) {
+        addTableConfig(CACHE_SIZE, String.valueOf(cacheSize), tableConfig);
+      }
+    }
+
+    addTableConfig(REAL_TABLE_ID, table.getTableId(), tableConfig);
+    addTableConfig(WRITE_AROUND, String.valueOf(isWriteAround), tableConfig);
+
+    return Collections.unmodifiableMap(tableConfig);
+  }
+
+  @Override
+  protected void validate() {
     Preconditions.checkNotNull(table, "Actual table is required.");
     if (cache == null) {
       Preconditions.checkNotNull(readTtl, "readTtl must be specified.");

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
index 192bd7e..3965e6d 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
@@ -19,16 +19,16 @@
 
 package org.apache.samza.table.descriptors;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.config.Config;
 import org.apache.samza.table.utils.SerdeUtils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 
-
 /**
  * Table descriptor for Guava-based caching table.
  * @param <K> type of the key in the cache
@@ -43,24 +43,13 @@ public class GuavaCacheTableDescriptor<K, V> extends BaseTableDescriptor<K, V, G
   private Cache<K, V> cache;
 
   /**
-   * {@inheritDoc}
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
    */
   public GuavaCacheTableDescriptor(String tableId) {
     super(tableId);
   }
 
-  @Override
-  public TableSpec getTableSpec() {
-    validate();
-
-    Map<String, String> tableSpecConfig = new HashMap<>();
-    generateTableSpecConfig(tableSpecConfig);
-
-    tableSpecConfig.put(GUAVA_CACHE, SerdeUtils.serialize("Guava cache", cache));
-
-    return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
-  }
-
   /**
    * Specify a pre-configured Guava cache instance to be used for caching table.
    * @param cache Guava cache instance
@@ -72,8 +61,19 @@ public class GuavaCacheTableDescriptor<K, V> extends BaseTableDescriptor<K, V, G
   }
 
   @Override
+  public String getProviderFactoryClassName() {
+    return PROVIDER_FACTORY_CLASS_NAME;
+  }
+
+  @Override
+  public Map<String, String> toConfig(Config jobConfig) {
+    Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
+    addTableConfig(GUAVA_CACHE, SerdeUtils.serialize("Guava cache", cache), tableConfig);
+    return Collections.unmodifiableMap(tableConfig);
+  }
+
+  @Override
   protected void validate() {
-    super.validate();
     Preconditions.checkArgument(cache != null, "Must provide a Guava cache instance.");
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
index ec8cc67..8708b7f 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
@@ -33,7 +33,8 @@ abstract public class HybridTableDescriptor<K, V, D extends HybridTableDescripto
     extends BaseTableDescriptor<K, V, D> {
 
   /**
-   * {@inheritDoc}
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
    */
   public HybridTableDescriptor(String tableId) {
     super(tableId);

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
index dfcaea4..d194091 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -20,12 +20,18 @@ package org.apache.samza.table.descriptors;
 
 import com.google.common.base.Preconditions;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.storage.SideInputsProcessor;
-
+import org.apache.samza.table.utils.SerdeUtils;
 
 /**
  * Table descriptor for store backed tables.
@@ -37,23 +43,19 @@ import org.apache.samza.storage.SideInputsProcessor;
 abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<K, V, D>>
     extends BaseTableDescriptor<K, V, D> {
 
-  static final public String INTERNAL_ENABLE_CHANGELOG = "internal.enable.changelog";
-  static final public String INTERNAL_CHANGELOG_STREAM = "internal.changelog.stream";
-  static final public String INTERNAL_CHANGELOG_REPLICATION_FACTOR = "internal.changelog.replication.factor";
+  public static final Pattern SYSTEM_STREAM_NAME_PATTERN = Pattern.compile("[\\d\\w-_.]+");
 
-  protected List<String> sideInputs;
-  protected SideInputsProcessor sideInputsProcessor;
-  protected boolean enableChangelog;
+  // Serdes for this table
+  protected final KVSerde<K, V> serde;
+
+  // changelog parameters
+  protected boolean enableChangelog; // Disabled by default
   protected String changelogStream;
   protected Integer changelogReplicationFactor;
 
-  /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   */
-  public LocalTableDescriptor(String tableId) {
-    super(tableId);
-  }
+  // Side input parameters
+  protected List<String> sideInputs;
+  protected SideInputsProcessor sideInputsProcessor;
 
   /**
    * Constructs a table descriptor instance
@@ -61,7 +63,8 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<
    * @param serde the serde for key and value
    */
   public LocalTableDescriptor(String tableId, KVSerde<K, V> serde) {
-    super(tableId, serde);
+    super(tableId);
+    this.serde = serde;
   }
 
   public D withSideInputs(List<String> sideInputs) {
@@ -129,27 +132,69 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<
     return (D) this;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * Note: Serdes are expected to be generated during configuration generation
+   * of the job node, which is not handled here.
+   */
   @Override
-  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
-    super.generateTableSpecConfig(tableSpecConfig);
+  public Map<String, String> toConfig(Config jobConfig) {
+
+    Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
+
+    JavaTableConfig javaTableConfig = new JavaTableConfig(jobConfig);
 
-    tableSpecConfig.put(INTERNAL_ENABLE_CHANGELOG, String.valueOf(enableChangelog));
+    if (sideInputs != null && !sideInputs.isEmpty()) {
+      sideInputs.forEach(si -> Preconditions.checkState(isValidSystemStreamName(si), String.format(
+          "Side input stream %s doesn't confirm to pattern %s", si, SYSTEM_STREAM_NAME_PATTERN)));
+      String formattedSideInputs = String.join(",", sideInputs);
+      addStoreConfig("side.inputs", formattedSideInputs, tableConfig);
+      addStoreConfig("side.inputs.processor.serialized.instance",
+          SerdeUtils.serialize("Side Inputs Processor", sideInputsProcessor), tableConfig);
+    }
+
+    // Changelog configuration
     if (enableChangelog) {
-      if (changelogStream != null) {
-        tableSpecConfig.put(INTERNAL_CHANGELOG_STREAM, changelogStream);
+      if (StringUtils.isEmpty(changelogStream)) {
+        String jobName = jobConfig.get("job.name");
+        Preconditions.checkNotNull(jobName, "job.name not found in job config");
+        String jobId = jobConfig.get("job.id");
+        Preconditions.checkNotNull(jobId, "job.id not found in job config");
+        changelogStream = String.format("%s-%s-table-%s", jobName, jobId, tableId);
       }
+
+      Preconditions.checkState(isValidSystemStreamName(changelogStream), String.format(
+          "Changelog stream %s doesn't confirm to pattern %s", changelogStream, SYSTEM_STREAM_NAME_PATTERN));
+      addStoreConfig("changelog", changelogStream, tableConfig);
+
       if (changelogReplicationFactor != null) {
-        tableSpecConfig.put(INTERNAL_CHANGELOG_REPLICATION_FACTOR, String.valueOf(changelogReplicationFactor));
+        addStoreConfig("changelog.replication.factor", changelogReplicationFactor.toString(), tableConfig);
       }
     }
+
+    return Collections.unmodifiableMap(tableConfig);
+  }
+
+  /**
+   * Get side input stream names
+   * @return side inputs
+   */
+  public List<String> getSideInputs() {
+    return sideInputs;
   }
 
   /**
-   * Validate that this table descriptor is constructed properly
+   * Get the serde assigned to this {@link TableDescriptor}
+   *
+   * @return {@link KVSerde} used by this table
    */
+  public KVSerde<K, V> getSerde() {
+    return serde;
+  }
+
   @Override
   protected void validate() {
-    super.validate();
     if (sideInputs != null || sideInputsProcessor != null) {
       Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null,
           String.format("Invalid side input configuration for table: %s. " +
@@ -165,4 +210,18 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<
     }
   }
 
+  /**
+   * Helper method to add a store level config item to table configuration
+   * @param key key of the config item
+   * @param value value of the config item
+   * @param tableConfig table configuration
+   */
+  protected void addStoreConfig(String key, String value, Map<String, String> tableConfig) {
+    tableConfig.put(String.format("stores.%s.%s", tableId, key), value);
+  }
+
+  private boolean isValidSystemStreamName(String name) {
+    return StringUtils.isNotBlank(name) && SYSTEM_STREAM_NAME_PATTERN.matcher(name).matches();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 8b34ec9..5bed7d6 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -20,12 +20,12 @@
 package org.apache.samza.table.descriptors;
 
 import java.lang.reflect.Constructor;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.samza.SamzaException;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.config.Config;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
@@ -35,7 +35,6 @@ import org.apache.samza.util.RateLimiter;
 
 import com.google.common.base.Preconditions;
 
-
 /**
  * Table descriptor for remote store backed tables
  *
@@ -104,69 +103,6 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
   }
 
   /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   * @param serde the serde for key and value
-   */
-  public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
-    super(tableId, serde);
-  }
-
-  @Override
-  public TableSpec getTableSpec() {
-    validate();
-
-    Map<String, String> tableSpecConfig = new HashMap<>();
-    generateTableSpecConfig(tableSpecConfig);
-
-    // Serialize and store reader/writer functions
-    tableSpecConfig.put(READ_FN, SerdeUtils.serialize("read function", readFn));
-
-    if (writeFn != null) {
-      tableSpecConfig.put(WRITE_FN, SerdeUtils.serialize("write function", writeFn));
-    }
-
-    if (!tagCreditsMap.isEmpty()) {
-      RateLimiter defaultRateLimiter;
-      try {
-        Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
-        Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
-        defaultRateLimiter = ctor.newInstance(tagCreditsMap);
-      } catch (Exception ex) {
-        throw new SamzaException("Failed to create default rate limiter", ex);
-      }
-      tableSpecConfig.put(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter));
-    } else if (rateLimiter != null) {
-      tableSpecConfig.put(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
-    }
-
-    // Serialize the readCredit functions
-    if (readCreditFn != null) {
-      tableSpecConfig.put(READ_CREDIT_FN, SerdeUtils.serialize(
-          "read credit function", readCreditFn));
-    }
-    // Serialize the writeCredit functions
-    if (writeCreditFn != null) {
-      tableSpecConfig.put(WRITE_CREDIT_FN, SerdeUtils.serialize(
-          "write credit function", writeCreditFn));
-    }
-
-    if (readRetryPolicy != null) {
-      tableSpecConfig.put(READ_RETRY_POLICY, SerdeUtils.serialize(
-          "read retry policy", readRetryPolicy));
-    }
-
-    if (writeRetryPolicy != null) {
-      tableSpecConfig.put(WRITE_RETRY_POLICY, SerdeUtils.serialize(
-          "write retry policy", writeRetryPolicy));
-    }
-
-    tableSpecConfig.put(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
-
-    return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
-  }
-
-  /**
    * Use specified TableReadFunction with remote table and a retry policy.
    * @param readFn read function instance
    * @return this table descriptor instance
@@ -284,8 +220,60 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
   }
 
   @Override
+  public String getProviderFactoryClassName() {
+    return PROVIDER_FACTORY_CLASS_NAME;
+  }
+
+  @Override
+  public Map<String, String> toConfig(Config jobConfig) {
+
+    Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
+
+    // Serialize and store reader/writer functions
+    addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig);
+
+    if (writeFn != null) {
+      addTableConfig(WRITE_FN, SerdeUtils.serialize("write function", writeFn), tableConfig);
+    }
+
+    if (!tagCreditsMap.isEmpty()) {
+      RateLimiter defaultRateLimiter;
+      try {
+        Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
+        Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
+        defaultRateLimiter = ctor.newInstance(tagCreditsMap);
+      } catch (Exception ex) {
+        throw new SamzaException("Failed to create default rate limiter", ex);
+      }
+      addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter), tableConfig);
+    } else if (rateLimiter != null) {
+      addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter), tableConfig);
+    }
+
+    // Serialize the readCredit functions
+    if (readCreditFn != null) {
+      addTableConfig(READ_CREDIT_FN, SerdeUtils.serialize("read credit function", readCreditFn), tableConfig);
+    }
+    // Serialize the writeCredit functions
+    if (writeCreditFn != null) {
+      addTableConfig(WRITE_CREDIT_FN, SerdeUtils.serialize("write credit function", writeCreditFn), tableConfig);
+    }
+
+    if (readRetryPolicy != null) {
+      addTableConfig(READ_RETRY_POLICY, SerdeUtils.serialize("read retry policy", readRetryPolicy), tableConfig);
+    }
+
+    if (writeRetryPolicy != null) {
+      addTableConfig(WRITE_RETRY_POLICY, SerdeUtils.serialize("write retry policy", writeRetryPolicy), tableConfig);
+    }
+
+    addTableConfig(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize), tableConfig);
+
+    return Collections.unmodifiableMap(tableConfig);
+  }
+
+  @Override
   protected void validate() {
-    super.validate();
     Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
     Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
         "Only one of rateLimiter instance or read/write limits can be specified");

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
index 9798091..806d158 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
@@ -18,10 +18,13 @@
  */
 package org.apache.samza.table.descriptors;
 
+import java.util.Map;
+
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
 import org.apache.samza.context.TaskContext;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.InitableFunction;
@@ -72,4 +75,18 @@ public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
    */
   D withConfig(String key, String value);
 
+  /**
+   * Generate configuration for this table descriptor, the generated configuration
+   * should be the complete configuration for this table that can be directly
+   * included in the job configuration.
+   *
+   * Note: although the serdes may have already been set in this instance, their
+   * corresponding configuration needs to be generated centrally for consistency
+   * and efficiency reasons. Therefore the serde configuration for this table
+   * is expected to have already been generated and stored in the {@code jobConfig}.
+   *
+   * @param jobConfig job configuration
+   * @return table configuration
+   */
+  Map<String, String> toConfig(Config jobConfig);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/test/java/org/apache/samza/table/TestBaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/table/TestBaseTableDescriptor.java b/samza-api/src/test/java/org/apache/samza/table/TestBaseTableDescriptor.java
new file mode 100644
index 0000000..db46ffd
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/table/TestBaseTableDescriptor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.Map;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestBaseTableDescriptor {
+
+  private static final String TABLE_ID = "t1";
+
+  @Test
+  public void testMinimal() {
+    Map<String, String> tableConfig = createTableDescriptor(TABLE_ID)
+        .toConfig(new MapConfig());
+    Assert.assertEquals(1, tableConfig.size());
+  }
+
+  @Test
+  public void testProviderFactoryConfig() {
+    Map<String, String> tableConfig = createTableDescriptor(TABLE_ID)
+        .toConfig(new MapConfig());
+    Assert.assertEquals(1, tableConfig.size());
+    assertEquals("my.factory", "provider.factory", TABLE_ID, tableConfig);
+  }
+
+  @Test
+  public void testCustomConfig() {
+    Map<String, String> tableConfig = createTableDescriptor(TABLE_ID)
+        .withConfig("abc", "xyz")
+        .toConfig(new MapConfig());
+    Assert.assertEquals(2, tableConfig.size());
+    Assert.assertEquals("xyz", tableConfig.get("abc"));
+  }
+
+  private BaseTableDescriptor createTableDescriptor(String tableId) {
+    return new BaseTableDescriptor(tableId) {
+      @Override
+      public String getProviderFactoryClassName() {
+        return "my.factory";
+      }
+      @Override
+      protected void validate() {
+      }
+    };
+  }
+
+  private void assertEquals(String expectedValue, String key, String tableId, Map<String, String> config) {
+    String realKey = JavaTableConfig.buildKey(tableId, key);
+    Assert.assertEquals(expectedValue, config.get(realKey));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
index e57c957..2632f4b 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
@@ -34,12 +34,12 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.OutputDescriptor;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.OutputStream;
+import org.apache.samza.table.descriptors.LocalTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.operators.TableImpl;
 import org.apache.samza.system.descriptors.SystemDescriptor;
@@ -119,9 +119,11 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
   @Override
   public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
     addTableDescriptor(tableDescriptor);
-    BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
-    getOrCreateTableSerdes(baseTableDescriptor.getTableId(), baseTableDescriptor.getSerde());
-    return new TableImpl(baseTableDescriptor.getTableSpec());
+    if (tableDescriptor instanceof LocalTableDescriptor) {
+      LocalTableDescriptor localTableDescriptor = (LocalTableDescriptor) tableDescriptor;
+      getOrCreateTableSerdes(localTableDescriptor.getTableId(), localTableDescriptor.getSerde());
+    }
+    return new TableImpl(tableDescriptor);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
index b29cec1..b6ef1ef 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
@@ -22,7 +22,7 @@ import org.apache.samza.application.TaskApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.OutputDescriptor;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.LocalTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.task.TaskFactory;
 
@@ -67,8 +67,10 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
   @Override
   public TaskApplicationDescriptor withTable(TableDescriptor tableDescriptor) {
     addTableDescriptor(tableDescriptor);
-    BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
-    getOrCreateTableSerdes(baseTableDescriptor.getTableId(), baseTableDescriptor.getSerde());
+    if (tableDescriptor instanceof LocalTableDescriptor) {
+      LocalTableDescriptor localTableDescriptor = (LocalTableDescriptor) tableDescriptor;
+      getOrCreateTableSerdes(localTableDescriptor.getTableId(), localTableDescriptor.getSerde());
+    }
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
deleted file mode 100644
index 3a3005a..0000000
--- a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.config;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-
-/**
- * A helper class for handling table configuration
- */
-public class JavaTableConfig extends MapConfig {
-
-  // Prefix
-  public static final String TABLES_PREFIX = "tables.";
-  public static final String TABLE_ID_PREFIX = TABLES_PREFIX + "%s";
-
-  // Suffix
-  public static final String TABLE_PROVIDER_FACTORY_SUFFIX = ".provider.factory";
-
-  // Config keys
-  public static final String TABLE_PROVIDER_FACTORY = String.format("%s.provider.factory", TABLE_ID_PREFIX);
-  public static final String TABLE_KEY_SERDE = String.format("%s.key.serde", TABLE_ID_PREFIX);
-  public static final String TABLE_VALUE_SERDE = String.format("%s.value.serde", TABLE_ID_PREFIX);
-
-
-  public JavaTableConfig(Config config) {
-    super(config);
-  }
-
-  /**
-   * Get Id's of all tables
-   * @return list of table Id's
-   */
-  public List<String> getTableIds() {
-    Config subConfig = subset(TABLES_PREFIX, true);
-    Set<String> tableNames = subConfig.keySet().stream()
-        .filter(k -> k.endsWith(TABLE_PROVIDER_FACTORY_SUFFIX))
-        .map(k -> k.replace(TABLE_PROVIDER_FACTORY_SUFFIX, ""))
-        .collect(Collectors.toSet());
-    return new LinkedList<>(tableNames);
-  }
-
-  /**
-   * Get the {@link org.apache.samza.table.TableProviderFactory} class for a table
-   * @param tableId Id of the table
-   * @return the {@link org.apache.samza.table.TableProviderFactory} class name
-   */
-  public String getTableProviderFactory(String tableId) {
-    return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null);
-  }
-
-  /**
-   * Get registry keys of key serde for this table
-   * @param tableId Id of the table
-   * @return serde retistry key
-   */
-  public String getKeySerde(String tableId) {
-    return get(String.format(TABLE_KEY_SERDE, tableId), null);
-  }
-
-  /**
-   * Get registry keys of value serde for this table
-   * @param tableId Id of the table
-   * @return serde retistry key
-   */
-  public String getValueSerde(String tableId) {
-    return get(String.format(TABLE_VALUE_SERDE, tableId), null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 10a4215..0110551 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.samza.SamzaException;
@@ -43,12 +44,12 @@ import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.LocalTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,8 +123,7 @@ public class ExecutionPlanner {
     Set<StreamSpec> inputStreams = Sets.difference(sourceStreams, intermediateStreams);
     Set<StreamSpec> outputStreams = Sets.difference(sinkStreams, intermediateStreams);
 
-    Set<TableSpec> tables = appDesc.getTableDescriptors().stream()
-        .map(tableDescriptor -> ((BaseTableDescriptor) tableDescriptor).getTableSpec()).collect(Collectors.toSet());
+    Set<TableDescriptor> tables = appDesc.getTableDescriptors();
 
     // For this phase, we have a single job node for the whole DAG
     String jobName = config.get(JobConfig.JOB_NAME());
@@ -140,12 +140,15 @@ public class ExecutionPlanner {
     intermediateStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
 
     // Add tables
-    for (TableSpec table : tables) {
+    for (TableDescriptor table : tables) {
       jobGraph.addTable(table, node);
       // Add side-input streams (if any)
-      Iterable<String> sideInputs = ListUtils.emptyIfNull(table.getSideInputs());
-      for (String sideInput : sideInputs) {
-        jobGraph.addSideInputStream(getStreamSpec(sideInput, streamConfig));
+      if (table instanceof LocalTableDescriptor) {
+        LocalTableDescriptor localTable = (LocalTableDescriptor) table;
+        Iterable<String> sideInputs = ListUtils.emptyIfNull(localTable.getSideInputs());
+        for (String sideInput : sideInputs) {
+          jobGraph.addSideInputStream(getStreamSpec(sideInput, streamConfig));
+        }
       }
     }
 
@@ -208,6 +211,9 @@ public class ExecutionPlanner {
         OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(
             jobGraph.getApplicationDescriptorImpl().getInputOperators().values());
 
+    Map<String, TableDescriptor> tableDescriptors = jobGraph.getTables().stream()
+        .collect(Collectors.toMap(TableDescriptor::getTableId, Function.identity()));
+
     // Convert every group of input operator specs into a group of corresponding stream edges.
     List<StreamSet> streamSets = new ArrayList<>();
     for (OperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) {
@@ -218,11 +224,14 @@ public class ExecutionPlanner {
       // streams associated with the joined table (if any).
       if (joinOpSpec instanceof StreamTableJoinOperatorSpec) {
         StreamTableJoinOperatorSpec streamTableJoinOperatorSpec = (StreamTableJoinOperatorSpec) joinOpSpec;
-
-        Collection<String> sideInputs = ListUtils.emptyIfNull(streamTableJoinOperatorSpec.getTableSpec().getSideInputs());
-        Iterable<StreamEdge> sideInputStreams = sideInputs.stream().map(jobGraph::getStreamEdge)::iterator;
-        Iterable<StreamEdge> streams = streamSet.getStreamEdges();
-        streamSet = new StreamSet(streamSet.getSetId(), Iterables.concat(streams, sideInputStreams));
+        TableDescriptor tableDescriptor = tableDescriptors.get(streamTableJoinOperatorSpec.getTableId());
+        if (tableDescriptor instanceof LocalTableDescriptor) {
+          LocalTableDescriptor localTableDescriptor = (LocalTableDescriptor) tableDescriptor;
+          Collection<String> sideInputs = ListUtils.emptyIfNull(localTableDescriptor.getSideInputs());
+          Iterable<StreamEdge> sideInputStreams = sideInputs.stream().map(jobGraph::getStreamEdge)::iterator;
+          Iterable<StreamEdge> streams = streamSet.getStreamEdges();
+          streamSet = new StreamSet(streamSet.getSetId(), Iterables.concat(streams, sideInputStreams));
+        }
       }
 
       streamSets.add(streamSet);

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 7944dd3..0da1fd5 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -36,7 +36,7 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
   private final Set<StreamEdge> outputStreams = new HashSet<>();
   private final Set<StreamEdge> intermediateStreams = new HashSet<>();
   private final Set<StreamEdge> sideInputStreams = new HashSet<>();
-  private final Set<TableSpec> tables = new HashSet<>();
+  private final Set<TableDescriptor> tables = new HashSet<>();
   private final Config config;
   private final JobGraphJsonGenerator jsonGenerator;
   private final JobNodeConfigurationGenerator configGenerator;
@@ -150,9 +150,9 @@ import org.slf4j.LoggerFactory;
     intermediateStreams.add(edge);
   }
 
-  void addTable(TableSpec tableSpec, JobNode node) {
-    tables.add(tableSpec);
-    node.addTable(tableSpec);
+  void addTable(TableDescriptor tableDescriptor, JobNode node) {
+    tables.add(tableDescriptor);
+    node.addTable(tableDescriptor);
   }
 
   /**
@@ -237,9 +237,9 @@ import org.slf4j.LoggerFactory;
 
   /**
    * Returns the tables in the graph
-   * @return unmodifiable set of {@link TableSpec}
+   * @return unmodifiable set of {@link TableDescriptor}
    */
-  Set<TableSpec> getTables() {
+  Set<TableDescriptor> getTables() {
     return Collections.unmodifiableSet(tables);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 4a2a235..4b11174 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -36,7 +36,8 @@ import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -59,10 +60,8 @@ import org.codehaus.jackson.map.ObjectMapper;
   static final class TableSpecJson {
     @JsonProperty("id")
     String id;
-    @JsonProperty("tableProviderFactory")
-    String tableProviderFactory;
-    @JsonProperty("config")
-    Map<String, String> config;
+    @JsonProperty("providerFactory")
+    String providerFactory;
   }
 
   static final class StreamEdgeJson {
@@ -183,13 +182,13 @@ import org.codehaus.jackson.map.ObjectMapper;
     }
 
     if (spec instanceof StreamTableJoinOperatorSpec) {
-      TableSpec tableSpec = ((StreamTableJoinOperatorSpec) spec).getTableSpec();
-      map.put("tableId", tableSpec.getId());
+      String tableId = ((StreamTableJoinOperatorSpec) spec).getTableId();
+      map.put("tableId", tableId);
     }
 
     if (spec instanceof StreamTableJoinOperatorSpec) {
-      TableSpec tableSpec = ((StreamTableJoinOperatorSpec) spec).getTableSpec();
-      map.put("tableId", tableSpec.getId());
+      String tableId = ((StreamTableJoinOperatorSpec) spec).getTableId();
+      map.put("tableId", tableId);
     }
 
     if (spec instanceof JoinOperatorSpec) {
@@ -270,27 +269,15 @@ import org.codehaus.jackson.map.ObjectMapper;
     return edgeJson;
   }
 
-  /**
-   * Get or create the JSON POJO for a {@link TableSpec}
-   * @param tableSpec the {@link TableSpec}
-   * @param tableSpecs a map of tableId to {@link TableSpecJson}
-   * @return JSON representation of the {@link TableSpec}
-   */
-  private TableSpecJson buildTableJson(TableSpec tableSpec, Map<String, TableSpecJson> tableSpecs) {
-    String tableId = tableSpec.getId();
-    return tableSpecs.computeIfAbsent(tableId, k -> buildTableJson(tableSpec));
+  private TableSpecJson buildTableJson(TableDescriptor tableDescriptor, Map<String, TableSpecJson> tableSpecs) {
+    String tableId = tableDescriptor.getTableId();
+    return tableSpecs.computeIfAbsent(tableId, k -> buildTableJson(tableDescriptor));
   }
 
-  /**
-   * Create the JSON POJO for a {@link TableSpec}
-   * @param tableSpec the {@link TableSpec}
-   * @return JSON representation of the {@link TableSpec}
-   */
-  private TableSpecJson buildTableJson(TableSpec tableSpec) {
+  private TableSpecJson buildTableJson(TableDescriptor tableDescriptor) {
     TableSpecJson tableSpecJson = new TableSpecJson();
-    tableSpecJson.id = tableSpec.getId();
-    tableSpecJson.tableProviderFactory = tableSpec.getTableProviderFactoryClassName();
-    tableSpecJson.config = tableSpec.getConfig();
+    tableSpecJson.id = tableDescriptor.getTableId();
+    tableSpecJson.providerFactory = ((BaseTableDescriptor) tableDescriptor).getProviderFactoryClassName();
     return tableSpecJson;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 82b4178..e4fbdba 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -35,7 +35,7 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +56,7 @@ public class JobNode {
   private final Map<String, StreamEdge> inEdges = new HashMap<>();
   private final Map<String, StreamEdge> outEdges = new HashMap<>();
   // Similarly, tables uses tableId as the key
-  private final Map<String, TableSpec> tables = new HashMap<>();
+  private final Map<String, TableDescriptor> tables = new HashMap<>();
   private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
 
   JobNode(String jobName, String jobId, Config config, ApplicationDescriptorImpl appDesc,
@@ -97,8 +97,8 @@ public class JobNode {
     outEdges.put(out.getStreamSpec().getId(), out);
   }
 
-  void addTable(TableSpec tableSpec) {
-    tables.put(tableSpec.getId(), tableSpec);
+  void addTable(TableDescriptor tableDescriptor) {
+    tables.put(tableDescriptor.getTableId(), tableDescriptor);
   }
 
   Map<String, StreamEdge> getInEdges() {
@@ -109,7 +109,7 @@ public class JobNode {
     return outEdges;
   }
 
-  Map<String, TableSpec> getTables() {
+  Map<String, TableDescriptor> getTables() {
     return tables;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 215177b..2fed979 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -50,7 +50,8 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.table.TableConfigGenerator;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.LocalTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.util.MathUtil;
 import org.apache.samza.util.StreamUtil;
 import org.apache.samza.util.Util;
@@ -93,7 +94,7 @@ import org.slf4j.LoggerFactory;
     Map<String, StreamEdge> outEdges = jobNode.getOutEdges();
     Collection<OperatorSpec> reachableOperators = jobNode.getReachableOperators();
     List<StoreDescriptor> stores = getStoreDescriptors(reachableOperators);
-    Map<String, TableSpec> reachableTables = getReachableTables(reachableOperators, jobNode);
+    Map<String, TableDescriptor> reachableTables = getReachableTables(reachableOperators, jobNode);
 
     // config passed by the JobPlanner. user-provided + system-stream descriptor config + misc. other config
     Config originalConfig = jobNode.getConfig();
@@ -141,7 +142,7 @@ import org.slf4j.LoggerFactory;
     return new JobConfig(mergeConfig(originalConfig, generatedConfig));
   }
 
-  private Map<String, TableSpec> getReachableTables(Collection<OperatorSpec> reachableOperators, JobNode jobNode) {
+  private Map<String, TableDescriptor> getReachableTables(Collection<OperatorSpec> reachableOperators, JobNode jobNode) {
     // TODO: Fix this in SAMZA-1893. For now, returning all tables for single-job execution plan
     return jobNode.getTables();
   }
@@ -207,22 +208,25 @@ import org.slf4j.LoggerFactory;
   }
 
   private void configureTables(Map<String, String> generatedConfig, Config originalConfig,
-      Map<String, TableSpec> tables, Set<String> inputs) {
+      Map<String, TableDescriptor> tables, Set<String> inputs) {
     generatedConfig.putAll(
-        TableConfigGenerator.generateConfigsForTableSpecs(
+        TableConfigGenerator.generate(
             new MapConfig(generatedConfig), new ArrayList<>(tables.values())));
 
     // Add side inputs to the inputs and mark the stream as bootstrap
-    tables.values().forEach(tableSpec -> {
-        List<String> sideInputs = tableSpec.getSideInputs();
-        if (sideInputs != null && !sideInputs.isEmpty()) {
-          sideInputs.stream()
-              .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(originalConfig, sideInput))
-              .forEach(systemStream -> {
-                  inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
-                  generatedConfig.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
-                      systemStream.getSystem(), systemStream.getStream()), "true");
-                });
+    tables.values().forEach(tableDescriptor -> {
+        if (tableDescriptor instanceof LocalTableDescriptor) {
+          LocalTableDescriptor localTableDescriptor = (LocalTableDescriptor) tableDescriptor;
+          List<String> sideInputs = localTableDescriptor.getSideInputs();
+          if (sideInputs != null && !sideInputs.isEmpty()) {
+            sideInputs.stream()
+                .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(originalConfig, sideInput))
+                .forEach(systemStream -> {
+                    inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
+                    generatedConfig.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
+                        systemStream.getSystem(), systemStream.getStream()), "true");
+                  });
+          }
         }
       });
   }
@@ -306,12 +310,12 @@ import org.slf4j.LoggerFactory;
 
     // set key and msg serdes for stores to the serde names generated above
     tableKeySerdes.forEach((tableId, serde) -> {
-        String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId);
+        String keySerdeConfigKey = String.format(JavaTableConfig.STORE_KEY_SERDE, tableId);
         configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
       });
 
     tableMsgSerdes.forEach((tableId, serde) -> {
-        String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId);
+        String valueSerdeConfigKey = String.format(JavaTableConfig.STORE_MSG_SERDE, tableId);
         configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
       });
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
index 123244b..70a5848 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
@@ -33,7 +33,6 @@ import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.TableSpec;
 
 
 /**
@@ -56,7 +55,7 @@ import org.apache.samza.table.TableSpec;
     Multimap<OperatorSpec, InputOperatorSpec> joinToInputOpSpecs = HashMultimap.create();
 
     // Create a getNextOpSpecs() function that emulates connections between every SendToTableOperatorSpec
-    // — which are terminal OperatorSpecs — and all StreamTableJoinOperatorSpecs referencing the same TableSpec.
+    // — which are terminal OperatorSpecs — and all StreamTableJoinOperatorSpecs referencing the same table.
     //
     // This is necessary to support Stream-Table Join scenarios because it allows us to associate streams behind
     // SendToTableOperatorSpecs with streams participating in Stream-Table Joins, a connection that would not be
@@ -96,14 +95,14 @@ import org.apache.samza.table.TableSpec;
    * {@code operatorSpecGraph}.
    *
    * Calling the returned function with any {@link SendToTableOperatorSpec} will return a collection of all
-   * {@link StreamTableJoinOperatorSpec}s that reference the same {@link TableSpec} as the specified
+   * {@link StreamTableJoinOperatorSpec}s that reference the same table as the specified
    * {@link SendToTableOperatorSpec}, as if they were actually connected.
    */
   private static Function<OperatorSpec, Iterable<OperatorSpec>> getCustomGetNextOpSpecs(
       Iterable<InputOperatorSpec> inputOpSpecs) {
 
     // Traverse operatorSpecGraph to create mapping between every SendToTableOperatorSpec and all
-    // StreamTableJoinOperatorSpecs referencing the same TableSpec.
+    // StreamTableJoinOperatorSpecs referencing the same table.
     TableJoinVisitor tableJoinVisitor = new TableJoinVisitor();
     for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
       traverse(inputOpSpec, tableJoinVisitor, opSpec -> opSpec.getRegisteredOperatorSpecs());
@@ -113,7 +112,7 @@ import org.apache.samza.table.TableSpec;
         tableJoinVisitor.getSendToTableOpSpecToStreamTableJoinOpSpecs();
 
     return operatorSpec -> {
-      // If this is a SendToTableOperatorSpec, return all StreamTableJoinSpecs referencing the same TableSpec.
+      // If this is a SendToTableOperatorSpec, return all StreamTableJoinSpecs referencing the same table.
       // For all other types of operator specs, return the next registered operator specs.
       if (operatorSpec instanceof SendToTableOperatorSpec) {
         SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) operatorSpec;
@@ -145,21 +144,21 @@ import org.apache.samza.table.TableSpec;
 
   /**
    * An {@link OperatorSpec} visitor that records associations between every {@link SendToTableOperatorSpec}
-   * and all {@link StreamTableJoinOperatorSpec}s that reference the same {@link TableSpec}.
+   * and all {@link StreamTableJoinOperatorSpec}s that reference the same table.
    */
   private static class TableJoinVisitor implements Consumer<OperatorSpec> {
-    private final Multimap<TableSpec, SendToTableOperatorSpec> tableSpecToSendToTableOpSpecs = HashMultimap.create();
-    private final Multimap<TableSpec, StreamTableJoinOperatorSpec> tableSpecToStreamTableJoinOpSpecs = HashMultimap.create();
+    private final Multimap<String, SendToTableOperatorSpec> tableToSendToTableOpSpecs = HashMultimap.create();
+    private final Multimap<String, StreamTableJoinOperatorSpec> tableToStreamTableJoinOpSpecs = HashMultimap.create();
 
     @Override
     public void accept(OperatorSpec opSpec) {
-      // Record all SendToTableOperatorSpecs, StreamTableJoinOperatorSpecs, and their corresponding TableSpecs.
+      // Record all SendToTableOperatorSpecs, StreamTableJoinOperatorSpecs, and their corresponding tables.
       if (opSpec instanceof SendToTableOperatorSpec) {
         SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) opSpec;
-        tableSpecToSendToTableOpSpecs.put(sendToTableOperatorSpec.getTableSpec(), sendToTableOperatorSpec);
+        tableToSendToTableOpSpecs.put(sendToTableOperatorSpec.getTableId(), sendToTableOperatorSpec);
       } else if (opSpec instanceof StreamTableJoinOperatorSpec) {
         StreamTableJoinOperatorSpec streamTableJoinOpSpec = (StreamTableJoinOperatorSpec) opSpec;
-        tableSpecToStreamTableJoinOpSpecs.put(streamTableJoinOpSpec.getTableSpec(), streamTableJoinOpSpec);
+        tableToStreamTableJoinOpSpecs.put(streamTableJoinOpSpec.getTableId(), streamTableJoinOpSpec);
       }
     }
 
@@ -167,11 +166,11 @@ import org.apache.samza.table.TableSpec;
       Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> sendToTableOpSpecToStreamTableJoinOpSpecs =
           HashMultimap.create();
 
-      // Map every SendToTableOperatorSpec to all StreamTableJoinOperatorSpecs referencing the same TableSpec.
-      for (TableSpec tableSpec : tableSpecToSendToTableOpSpecs.keySet()) {
-        Collection<SendToTableOperatorSpec> sendToTableOpSpecs = tableSpecToSendToTableOpSpecs.get(tableSpec);
+      // Map every SendToTableOperatorSpec to all StreamTableJoinOperatorSpecs referencing the same table.
+      for (String tableId : tableToSendToTableOpSpecs.keySet()) {
+        Collection<SendToTableOperatorSpec> sendToTableOpSpecs = tableToSendToTableOpSpecs.get(tableId);
         Collection<StreamTableJoinOperatorSpec> streamTableJoinOpSpecs =
-            tableSpecToStreamTableJoinOpSpecs.get(tableSpec);
+            tableToStreamTableJoinOpSpecs.get(tableId);
 
         for (SendToTableOperatorSpec sendToTableOpSpec : sendToTableOpSpecs) {
           sendToTableOpSpecToStreamTableJoinOpSpecs.putAll(sendToTableOpSpec, streamTableJoinOpSpecs);

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 0f43c5e..b652d68 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -50,7 +50,6 @@ import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
 
 
 /**
@@ -147,9 +146,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
       StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn) {
     String opId = this.streamAppDesc.getNextOpId(OpCode.JOIN);
-    TableSpec tableSpec = ((TableImpl) table).getTableSpec();
     StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec(
-        tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId);
+        ((TableImpl) table).getTableId(), (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId);
     this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
     return new MessageStreamImpl<>(this.streamAppDesc, joinOpSpec);
   }
@@ -183,7 +181,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   public <K, V> void sendTo(Table<KV<K, V>> table) {
     String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
     SendToTableOperatorSpec<K, V> op =
-        OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableSpec(), opId);
+        OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableId(), opId);
     this.operatorSpec.registerNextOperatorSpec(op);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
index 8ceada0..654968b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
@@ -20,22 +20,26 @@ package org.apache.samza.operators;
 
 import java.io.Serializable;
 import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.TableDescriptor;
 
 
 /**
- * This class is the holder of a {@link TableSpec}
+ * This class is the holder of a table Id
  */
 public class TableImpl implements Table, Serializable {
 
-  private final TableSpec tableSpec;
+  private final String tableId;
 
-  public TableImpl(TableSpec tableSpec) {
-    this.tableSpec = tableSpec;
+  public TableImpl(String tableId) {
+    this.tableId = tableId;
   }
 
-  public TableSpec getTableSpec() {
-    return tableSpec;
+  public TableImpl(TableDescriptor tableDescriptor) {
+    this.tableId = tableDescriptor.getTableId();
+  }
+
+  public String getTableId() {
+    return tableId;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index be3e0a3..0d39c1b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -44,7 +44,7 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void>
 
   SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Context context) {
     this.sendToTableOpSpec = sendToTableOpSpec;
-    this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableSpec().getId());
+    this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableId());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
index d89f2b3..96f07d1 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
@@ -46,7 +46,7 @@ class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M
 
   StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, Context context) {
     this.joinOpSpec = joinOpSpec;
-    this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableSpec().getId());
+    this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableId());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 8d3ff60..ff3fe67 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -29,8 +29,6 @@ import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
-
 
 /**
  * Factory methods for creating {@link OperatorSpec} instances.
@@ -192,7 +190,7 @@ public class OperatorSpecs {
   /**
    * Creates a {@link StreamTableJoinOperatorSpec} with a join function.
    *
-   * @param tableSpec the table spec for the table on the right side of the join
+   * @param tableId the table Id for the table on the right side of the join
    * @param joinFn the user-defined join function to get join keys and results
    * @param opId the unique ID of the operator
    * @param <K> the type of join key
@@ -202,23 +200,23 @@ public class OperatorSpecs {
    * @return the {@link StreamTableJoinOperatorSpec}
    */
   public static <K, M, R, JM> StreamTableJoinOperatorSpec<K, M, R, JM> createStreamTableJoinOperatorSpec(
-      TableSpec tableSpec, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) {
-    return new StreamTableJoinOperatorSpec(tableSpec, joinFn, opId);
+      String tableId, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) {
+    return new StreamTableJoinOperatorSpec(tableId, joinFn, opId);
   }
 
   /**
    * Creates a {@link SendToTableOperatorSpec} with a key extractor and a value extractor function,
    * the type of incoming message is expected to be KV&#60;K, V&#62;.
    *
-   * @param tableSpec the table spec for the underlying table
+   * @param tableId the table Id for the underlying table
    * @param opId the unique ID of the operator
    * @param <K> the type of the table record key
    * @param <V> the type of the table record value
    * @return the {@link SendToTableOperatorSpec}
    */
   public static <K, V> SendToTableOperatorSpec<K, V> createSendToTableOperatorSpec(
-     TableSpec tableSpec, String opId) {
-    return new SendToTableOperatorSpec(tableSpec, opId);
+     String tableId, String opId) {
+    return new SendToTableOperatorSpec(tableId, opId);
   }
 
   /**