You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/04/20 16:19:33 UTC

[2/2] samza git commit: SAMZA-1671: sql: initial insert support for table destination

SAMZA-1671: sql: initial insert support for table destination

Table is another main type of IO abstraction in Samza which supports
both read and write (optional). For the tables that do support writes, we
should be able to allow Samza SQL users to write a query to do that. One
example is to insert into a database. The current code only supports
inserting into a stream. This change adds the initial support for table
insert operation.

Author: Peng Du <pd...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>, Srini P<sp...@linkedin.com>

Closes #465 from pdu-mn1/sql-insert-table


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2f4d0b95
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2f4d0b95
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2f4d0b95

Branch: refs/heads/master
Commit: 2f4d0b953c635e8676b83cc1a4a28213dc21a49d
Parents: 3cc2a05
Author: Peng Du <pd...@linkedin.com>
Authored: Fri Apr 20 09:18:31 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Apr 20 09:18:31 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/config/JavaTableConfig.java    |   2 +-
 .../sql/impl/ConfigBasedIOResolverFactory.java  | 122 ++++++++++++
 .../impl/ConfigBasedSourceResolverFactory.java  | 109 -----------
 .../samza/sql/impl/ConfigBasedUdfResolver.java  |   1 -
 .../samza/sql/interfaces/SourceResolver.java    |  44 -----
 .../sql/interfaces/SourceResolverFactory.java   |  36 ----
 .../samza/sql/interfaces/SqlIOConfig.java       | 136 +++++++++++++
 .../samza/sql/interfaces/SqlIOResolver.java     |  45 +++++
 .../sql/interfaces/SqlIOResolverFactory.java    |  36 ++++
 .../sql/interfaces/SqlSystemSourceConfig.java   | 129 ------------
 .../apache/samza/sql/planner/QueryPlanner.java  |  10 +-
 .../sql/runner/SamzaSqlApplicationConfig.java   |  50 ++---
 .../sql/runner/SamzaSqlApplicationRunner.java   |  12 +-
 .../samza/sql/testutil/SamzaSqlQueryParser.java |  62 +++---
 .../samza/sql/translator/JoinTranslator.java    |  66 ++++---
 .../samza/sql/translator/QueryTranslator.java   |  39 +++-
 .../samza/sql/translator/ScanTranslator.java    |   8 +-
 .../apache/samza/sql/TestQueryTranslator.java   |   8 +-
 .../sql/TestSamzaSqlApplicationConfig.java      |  14 +-
 .../samza/sql/TestSamzaSqlQueryParser.java      |  14 +-
 .../apache/samza/sql/e2e/TestSamzaSqlTable.java |  69 +++++++
 .../samza/sql/testutil/SamzaSqlTestConfig.java  |  34 +++-
 .../sql/testutil/TestIOResolverFactory.java     | 195 +++++++++++++++++++
 .../sql/testutil/TestSourceResolverFactory.java |  66 -------
 .../org/apache/samza/tools/SamzaSqlConsole.java |  28 +--
 25 files changed, 799 insertions(+), 536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
index 6cc3986..ed013c4 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -53,7 +53,7 @@ public class JavaTableConfig extends MapConfig {
     Config subConfig = subset(TABLES_PREFIX, true);
     Set<String> tableNames = subConfig.keySet().stream()
         .filter(k -> k.endsWith(TABLE_PROVIDER_FACTORY_SUFFIX))
-        .map(k -> k.substring(0, k.indexOf(".")))
+        .map(k -> k.replace(TABLE_PROVIDER_FACTORY_SUFFIX, ""))
         .collect(Collectors.toSet());
     return new LinkedList<>(tableNames);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
new file mode 100644
index 0000000..0887dc4
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -0,0 +1,122 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.impl;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Source Resolver implementation that uses static config to return a config corresponding to a system stream.
+ * This Source resolver implementation supports sources of type {systemName}.{streamName}[.$table]
+ * {systemName}.{streamName} indicates a stream
+ * {systemName}.{streamName}.$table indicates a table
+ */
+public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConfigBasedIOResolverFactory.class);
+
+  public static final String CFG_FMT_SAMZA_PREFIX = "systems.%s.";
+
+  @Override
+  public SqlIOResolver create(Config config) {
+    return new ConfigBasedIOResolver(config);
+  }
+
+  private class ConfigBasedIOResolver implements SqlIOResolver {
+    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
+    private final Config config;
+
+    public ConfigBasedIOResolver(Config config) {
+      this.config = config;
+    }
+
+    @Override
+    public SqlIOConfig fetchSourceInfo(String source) {
+      String[] sourceComponents = source.split("\\.");
+      boolean isTable = isTable(sourceComponents);
+
+      // This source resolver expects sources of format {systemName}.{streamName}[.$table]
+      //  * First source part is always system name.
+      //  * The last source part could be either a "$table" keyword or stream name. If it is "$table", then stream name
+      //    should be the one before the last source part.
+      int endIdx = sourceComponents.length - 1;
+      int streamIdx = isTable ? endIdx - 1 : endIdx;
+      boolean invalidQuery = false;
+
+      if (sourceComponents.length != 2) {
+        if (sourceComponents.length != 3 ||
+            !sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+          invalidQuery = true;
+        }
+      } else {
+        if (sourceComponents[0].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD) ||
+            sourceComponents[1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+          invalidQuery = true;
+        }
+      }
+
+      if (invalidQuery) {
+        String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", source,
+            SAMZA_SQL_QUERY_TABLE_KEYWORD);
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+
+      String systemName = sourceComponents[0];
+      String streamName = sourceComponents[streamIdx];
+
+      TableDescriptor tableDescriptor = null;
+      if (isTable) {
+        tableDescriptor = new RocksDbTableDescriptor("InputTable-" + source)
+            .withSerde(KVSerde.of(
+                new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
+                new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+      }
+
+      return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor);
+    }
+
+    @Override
+    public SqlIOConfig fetchSinkInfo(String sink) {
+      throw new NotImplementedException("No sink support in ConfigBasedIOResolver.");
+    }
+
+    private boolean isTable(String[] sourceComponents) {
+      return sourceComponents[sourceComponents.length - 1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD);
+    }
+
+    private Config fetchSystemConfigs(String systemName) {
+      return config.subset(systemName + ".");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
deleted file mode 100644
index 5348d3d..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.impl;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Source Resolver implementation that uses static config to return a config corresponding to a system stream.
- * This Source resolver implementation supports sources of type {systemName}.{streamName}[.$table]
- * {systemName}.{streamName} indicates a stream
- * {systemName}.{streamName}.$table indicates a table
- */
-public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ConfigBasedSourceResolverFactory.class);
-
-  public static final String CFG_FMT_SAMZA_PREFIX = "systems.%s.";
-
-  @Override
-  public SourceResolver create(Config config) {
-    return new ConfigBasedSourceResolver(config);
-  }
-
-  private class ConfigBasedSourceResolver implements SourceResolver {
-    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
-    private final Config config;
-
-    public ConfigBasedSourceResolver(Config config) {
-      this.config = config;
-    }
-
-    @Override
-    public SqlSystemSourceConfig fetchSourceInfo(String source) {
-      String[] sourceComponents = source.split("\\.");
-      boolean isTable = false;
-
-      // This source resolver expects sources of format {systemName}.{streamName}[.$table]
-      //  * First source part is always system name.
-      //  * The last source part could be either a "$table" keyword or stream name. If it is "$table", then stream name
-      //    should be the one before the last source part.
-      int endIdx = sourceComponents.length - 1;
-      int streamIdx = endIdx;
-      boolean invalidQuery = false;
-
-      if (sourceComponents.length != 2) {
-        if (sourceComponents.length != 3 ||
-            !sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
-          invalidQuery = true;
-        }
-      } else {
-        if (sourceComponents[0].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD) ||
-            sourceComponents[1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
-          invalidQuery = true;
-        }
-      }
-
-      if (invalidQuery) {
-        String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", source,
-            SAMZA_SQL_QUERY_TABLE_KEYWORD);
-        LOG.error(msg);
-        throw new SamzaException(msg);
-      }
-
-      if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
-        isTable = true;
-        streamIdx = endIdx - 1;
-      }
-
-      String systemName = sourceComponents[0];
-      String streamName = sourceComponents[streamIdx];
-
-      return new SqlSystemSourceConfig(systemName, streamName, fetchSystemConfigs(systemName), isTable);
-    }
-
-    @Override
-    public boolean isTable(String sourceName) {
-      String[] sourceComponents = sourceName.split("\\.");
-      return sourceComponents[sourceComponents.length - 1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD);
-    }
-
-    private Config fetchSystemConfigs(String systemName) {
-      return config.subset(systemName + ".");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
index 412ff3b..a7eed84 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import org.apache.commons.lang.StringUtils;

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
deleted file mode 100644
index c161a0d..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.interfaces;
-
-/**
- * Source Resolvers are used by Samza Sql application to fetch the {@link SqlSystemSourceConfig} corresponding to the source.
- */
-public interface SourceResolver {
-  /**
-   * Returns the SystemStream config corresponding to the source name
-   * @param sourceName
-   *  source whose systemstreamconfig needs to be fetched.
-   * @return
-   *  System stream config corresponding to the source.
-   */
-  SqlSystemSourceConfig fetchSourceInfo(String sourceName);
-
-  /**
-   * Returns if a given source is a table. Different source resolvers could have different notations in the source
-   * name for denoting a table. Eg: system.stream.$table
-   * @param sourceName
-   *  source that needs to be checked if it is a table.
-   * @return
-   *  true if the source is a table, else false.
-   */
-  boolean isTable(String sourceName);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
deleted file mode 100644
index 274a6b1..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.interfaces;
-
-import org.apache.samza.config.Config;
-
-
-/**
- * Factory that is used to create {@link SourceResolver}.
- */
-public interface SourceResolverFactory {
-
-  /**
-   * Create the {@link SourceResolver}. This is called during the application initialization.
-   * @param config config for the SourceResolver
-   * @return Returns the created {@link SourceResolver}
-   */
-  SourceResolver create(Config config);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
new file mode 100644
index 0000000..3a73e09
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Configs associated with an IO resource. Both stream and table resources are supported.
+ */
+public class SqlIOConfig {
+
+  public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
+  public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
+
+  private final String systemName;
+
+  private final String streamName;
+
+  private final String samzaRelConverterName;
+  private final SystemStream systemStream;
+
+  private final String source;
+  private final String relSchemaProviderName;
+
+  private final Config config;
+
+  private final List<String> sourceParts;
+
+  private final Optional<TableDescriptor> tableDescriptor;
+
+  public SqlIOConfig(String systemName, String streamName, Config systemConfig) {
+    this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig, null);
+  }
+
+  public SqlIOConfig(String systemName, String streamName, Config systemConfig, TableDescriptor tableDescriptor) {
+    this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig, tableDescriptor);
+  }
+
+  public SqlIOConfig(String systemName, String streamName, List<String> sourceParts,
+      Config systemConfig, TableDescriptor tableDescriptor) {
+    HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
+    this.systemName = systemName;
+    this.streamName = streamName;
+    this.source = getSourceFromSourceParts(sourceParts);
+    this.sourceParts = sourceParts;
+    this.systemStream = new SystemStream(systemName, streamName);
+    this.tableDescriptor = Optional.ofNullable(tableDescriptor);
+
+    samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
+    Validate.notEmpty(samzaRelConverterName,
+        String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+
+    relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
+
+    // Removing the Samza SQL specific configs to get the remaining Samza configs.
+    streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
+    streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
+
+    // Currently, only local table is supported. And it is assumed that all tables are local tables.
+    if (tableDescriptor != null) {
+      streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamName), "true");
+      streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamName), "oldest");
+    }
+
+    config = new MapConfig(streamConfigs);
+  }
+
+  public static String getSourceFromSourceParts(List<String> sourceParts) {
+    return Joiner.on(".").join(sourceParts);
+  }
+
+  public List<String> getSourceParts() {
+    return sourceParts;
+  }
+
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public String getSamzaRelConverterName() {
+    return samzaRelConverterName;
+  }
+
+  public String getRelSchemaProviderName() {
+    return relSchemaProviderName;
+  }
+
+  public SystemStream getSystemStream() {
+    return systemStream;
+  }
+
+  public Config getConfig() {
+    return config;
+  }
+
+  public String getSource() {
+    return source;
+  }
+
+  public Optional<TableDescriptor> getTableDescriptor() {
+    return tableDescriptor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolver.java
new file mode 100644
index 0000000..b2c3cd7
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolver.java
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+
+/**
+ * IO Resolvers are used by Samza SQL application to fetch the {@link SqlIOConfig} corresponding
+ * to the input and output system, including both Samza stream and table systems.
+ */
+public interface SqlIOResolver {
+  /**
+   * Returns the SQL IO config corresponding to the source name
+   * @param sourceName
+   *  source whose IOConfig needs to be fetched.
+   * @return
+   *  IOConfig corresponding to the source.
+   */
+  SqlIOConfig fetchSourceInfo(String sourceName);
+
+  /**
+   * Returns the SQL IO config corresponding to the sink name
+   * @param sinkName
+   *  sink whose IOConfig needs to be fetched.
+   * @return
+   *  IOConfig corresponding to the sink.
+   */
+  SqlIOConfig fetchSinkInfo(String sinkName);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolverFactory.java
new file mode 100644
index 0000000..6efa57c
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolverFactory.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory that is used to create {@link SqlIOResolver}.
+ */
+public interface SqlIOResolverFactory {
+
+  /**
+   * Create the {@link SqlIOResolver}. This is called during the application initialization.
+   * @param config config for the SqlIOResolver
+   * @return Returns the created {@link SqlIOResolver}
+   */
+  SqlIOResolver create(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
deleted file mode 100644
index 02ec18a..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.interfaces;
-
-import com.google.common.base.Joiner;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import org.apache.commons.lang.Validate;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * Configs associated with a system source. Both streams and table sources are supported.
- * For now, only local tables are supported.
- */
-public class SqlSystemSourceConfig {
-
-  public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
-  public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
-
-  private final String systemName;
-
-  private final String streamName;
-
-  private final String samzaRelConverterName;
-  private final SystemStream systemStream;
-
-  private final String source;
-  private String relSchemaProviderName;
-
-  private Config config;
-
-  private List<String> sourceParts;
-
-  public SqlSystemSourceConfig(String systemName, String streamName, Config systemConfig) {
-    this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig, false);
-  }
-
-  public SqlSystemSourceConfig(String systemName, String streamName, Config systemConfig, boolean isTable) {
-    this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig, isTable);
-  }
-
-  public SqlSystemSourceConfig(String systemName, String streamName, List<String> sourceParts,
-      Config systemConfig, boolean isTable) {
-
-
-    HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
-    this.systemName = systemName;
-    this.streamName = streamName;
-    this.source = getSourceFromSourceParts(sourceParts);
-    this.sourceParts = sourceParts;
-    this.systemStream = new SystemStream(systemName, streamName);
-
-    samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
-    Validate.notEmpty(samzaRelConverterName,
-        String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
-
-    relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
-
-    // Removing the Samza SQL specific configs to get the remaining Samza configs.
-    streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
-    streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
-
-    // Currently, only local table is supported. And it is assumed that all tables are local tables.
-    if (isTable) {
-      streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamName), "true");
-      streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamName), "oldest");
-    }
-
-    config = new MapConfig(streamConfigs);
-  }
-
-  public static String getSourceFromSourceParts(List<String> sourceParts) {
-    return Joiner.on(".").join(sourceParts);
-  }
-
-  public List<String> getSourceParts() {
-    return sourceParts;
-  }
-
-  public String getSystemName() {
-    return systemName;
-  }
-
-  public String getStreamName() {
-    return streamName;
-  }
-
-  public String getSamzaRelConverterName() {
-    return samzaRelConverterName;
-  }
-
-  public String getRelSchemaProviderName() {
-    return relSchemaProviderName;
-  }
-
-  public SystemStream getSystemStream() {
-    return systemStream;
-  }
-
-  public Config getConfig() {
-    return config;
-  }
-
-  public String getSource() {
-    return source;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index f21eccf..8ebb885 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -55,7 +55,7 @@ import org.apache.calcite.tools.Planner;
 import org.apache.samza.SamzaException;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,11 +72,11 @@ public class QueryPlanner {
   // Mapping between the source to the RelSchemaProvider corresponding to the source.
   private final Map<String, RelSchemaProvider> relSchemaProviders;
 
-  // Mapping between the source to the SqlSystemSourceConfig corresponding to the source.
-  private final Map<String, SqlSystemSourceConfig> systemStreamConfigBySource;
+  // Mapping between the source to the SqlIOConfig corresponding to the source.
+  private final Map<String, SqlIOConfig> systemStreamConfigBySource;
 
   public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
-      Map<String, SqlSystemSourceConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
+      Map<String, SqlIOConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
     this.relSchemaProviders = relSchemaProviders;
     this.systemStreamConfigBySource = systemStreamConfigBySource;
     this.udfMetadata = udfMetadata;
@@ -88,7 +88,7 @@ public class QueryPlanner {
       CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
       SchemaPlus rootSchema = calciteConnection.getRootSchema();
 
-      for (SqlSystemSourceConfig ssc : systemStreamConfigBySource.values()) {
+      for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
         SchemaPlus previousLevelSchema = rootSchema;
         List<String> sourceParts = ssc.getSourceParts();
         RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index bcefae2..b7d9a59 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -40,9 +40,9 @@ import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.apache.samza.sql.interfaces.UdfResolver;
 import org.apache.samza.sql.testutil.JsonUtil;
@@ -72,8 +72,8 @@ public class SamzaSqlApplicationConfig {
   public static final String CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN = "samza.sql.relSchemaProvider.%s.";
   public static final String CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN = "samza.sql.relConverter.%s.";
 
-  public static final String CFG_SOURCE_RESOLVER = "samza.sql.sourceResolver";
-  public static final String CFG_FMT_SOURCE_RESOLVER_DOMAIN = "samza.sql.sourceResolver.%s.";
+  public static final String CFG_IO_RESOLVER = "samza.sql.ioResolver";
+  public static final String CFG_FMT_SOURCE_RESOLVER_DOMAIN = "samza.sql.ioResolver.%s.";
 
   public static final String CFG_UDF_RESOLVER = "samza.sql.udfResolver";
   public static final String CFG_FMT_UDF_RESOLVER_DOMAIN = "samza.sql.udfResolver.%s.";
@@ -85,13 +85,13 @@ public class SamzaSqlApplicationConfig {
   private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
   private final Map<String, SamzaRelConverter> samzaRelConvertersBySource;
 
-  private SourceResolver sourceResolver;
+  private SqlIOResolver ioResolver;
   private UdfResolver udfResolver;
 
   private final Collection<UdfMetadata> udfMetadata;
 
-  private final Map<String, SqlSystemSourceConfig> inputSystemStreamConfigBySource;
-  private final Map<String, SqlSystemSourceConfig> outputSystemStreamConfigsBySource;
+  private final Map<String, SqlIOConfig> inputSystemStreamConfigBySource;
+  private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource;
 
   private final List<String> sql;
 
@@ -105,31 +105,31 @@ public class SamzaSqlApplicationConfig {
 
     queryInfo = fetchQueryInfo(sql);
 
-    sourceResolver = createSourceResolver(staticConfig);
+    ioResolver = createIOResolver(staticConfig);
 
     udfResolver = createUdfResolver(staticConfig);
     udfMetadata = udfResolver.getUdfs();
 
     inputSystemStreamConfigBySource = queryInfo.stream()
-        .map(QueryInfo::getInputSources)
+        .map(QueryInfo::getSources)
         .flatMap(Collection::stream)
-        .collect(Collectors.toMap(Function.identity(), sourceResolver::fetchSourceInfo));
+        .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src)));
 
-    Set<SqlSystemSourceConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
+    Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
 
     outputSystemStreamConfigsBySource = queryInfo.stream()
-        .map(QueryInfo::getOutputSource)
-        .collect(Collectors.toMap(Function.identity(), x -> sourceResolver.fetchSourceInfo(x)));
+        .map(QueryInfo::getSink)
+        .collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x)));
     systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
 
     relSchemaProvidersBySource = systemStreamConfigs.stream()
-        .collect(Collectors.toMap(SqlSystemSourceConfig::getSource,
+        .collect(Collectors.toMap(SqlIOConfig::getSource,
             x -> initializePlugin("RelSchemaProvider", x.getRelSchemaProviderName(), staticConfig,
                 CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
                 (o, c) -> ((RelSchemaProviderFactory) o).create(x.getSystemStream(), c))));
 
     samzaRelConvertersBySource = systemStreamConfigs.stream()
-        .collect(Collectors.toMap(SqlSystemSourceConfig::getSource,
+        .collect(Collectors.toMap(SqlIOConfig::getSource,
             x -> initializePlugin("SamzaRelConverter", x.getSamzaRelConverterName(), staticConfig,
                 CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
                     relSchemaProvidersBySource.get(x.getSource()), c))));
@@ -183,11 +183,11 @@ public class SamzaSqlApplicationConfig {
     return JsonUtil.toJson(sqlStmts);
   }
 
-  public static SourceResolver createSourceResolver(Config config) {
-    String sourceResolveValue = config.get(CFG_SOURCE_RESOLVER);
-    Validate.notEmpty(sourceResolveValue, "sourceResolver config is not set or empty");
-    return initializePlugin("SourceResolver", sourceResolveValue, config, CFG_FMT_SOURCE_RESOLVER_DOMAIN,
-        (o, c) -> ((SourceResolverFactory) o).create(c));
+  public static SqlIOResolver createIOResolver(Config config) {
+    String sourceResolveValue = config.get(CFG_IO_RESOLVER);
+    Validate.notEmpty(sourceResolveValue, "ioResolver config is not set or empty");
+    return initializePlugin("SqlIOResolver", sourceResolveValue, config, CFG_FMT_SOURCE_RESOLVER_DOMAIN,
+        (o, c) -> ((SqlIOResolverFactory) o).create(c));
   }
 
   private UdfResolver createUdfResolver(Map<String, String> config) {
@@ -234,11 +234,11 @@ public class SamzaSqlApplicationConfig {
     return udfMetadata;
   }
 
-  public Map<String, SqlSystemSourceConfig> getInputSystemStreamConfigBySource() {
+  public Map<String, SqlIOConfig> getInputSystemStreamConfigBySource() {
     return inputSystemStreamConfigBySource;
   }
 
-  public Map<String, SqlSystemSourceConfig> getOutputSystemStreamConfigsBySource() {
+  public Map<String, SqlIOConfig> getOutputSystemStreamConfigsBySource() {
     return outputSystemStreamConfigsBySource;
   }
 
@@ -250,8 +250,8 @@ public class SamzaSqlApplicationConfig {
     return relSchemaProvidersBySource;
   }
 
-  public SourceResolver getSourceResolver() {
-    return sourceResolver;
+  public SqlIOResolver getIoResolver() {
+    return ioResolver;
   }
 
   public long getWindowDurationMs() {

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index f54ca42..4497a7c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -31,8 +31,8 @@ import org.apache.samza.runtime.AbstractApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.RemoteApplicationRunner;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +70,7 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
   public static Config computeSamzaConfigs(Boolean localRunner, Config config) {
     Map<String, String> newConfig = new HashMap<>();
 
-    SourceResolver sourceResolver = SamzaSqlApplicationConfig.createSourceResolver(config);
+    SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(config);
     // Parse the sql and find the input stream streams
     List<String> sqlStmts = SamzaSqlApplicationConfig.fetchSqlFromConfig(config);
 
@@ -81,14 +81,14 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = SamzaSqlApplicationConfig.fetchQueryInfo(sqlStmts);
     for (SamzaSqlQueryParser.QueryInfo query : queryInfo) {
       // Populate stream to system mapping config for input and output system streams
-      for (String inputSource : query.getInputSources()) {
-        SqlSystemSourceConfig inputSystemStreamConfig = sourceResolver.fetchSourceInfo(inputSource);
+      for (String inputSource : query.getSources()) {
+        SqlIOConfig inputSystemStreamConfig = ioResolver.fetchSourceInfo(inputSource);
         newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
             inputSystemStreamConfig.getSystemName());
         newConfig.putAll(inputSystemStreamConfig.getConfig());
       }
 
-      SqlSystemSourceConfig outputSystemStreamConfig = sourceResolver.fetchSourceInfo(query.getOutputSource());
+      SqlIOConfig outputSystemStreamConfig = ioResolver.fetchSinkInfo(query.getSink());
       newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()),
           outputSystemStreamConfig.getSystemName());
       newConfig.putAll(outputSystemStreamConfig.getConfig());

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
index faf903a..39ea092 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
@@ -52,7 +52,7 @@ import org.apache.samza.SamzaException;
 
 
 /**
- * Utility class that is used to parse the Samza sql query to figure out the inputs, outputs etc..
+ * Utility class that is used to parse the Samza sql query to figure out the sources, sink etc..
  */
 public class SamzaSqlQueryParser {
 
@@ -60,26 +60,26 @@ public class SamzaSqlQueryParser {
   }
 
   public static class QueryInfo {
-    private final List<String> inputSources;
+    private final List<String> sources;
     private String selectQuery;
-    private String outputSource;
+    private String sink;
 
-    public QueryInfo(String selectQuery, List<String> inputSources, String outputSource) {
+    public QueryInfo(String selectQuery, List<String> sources, String sink) {
       this.selectQuery = selectQuery;
-      this.outputSource = outputSource;
-      this.inputSources = inputSources;
+      this.sink = sink;
+      this.sources = sources;
     }
 
-    public List<String> getInputSources() {
-      return inputSources;
+    public List<String> getSources() {
+      return sources;
     }
 
     public String getSelectQuery() {
       return selectQuery;
     }
 
-    public String getOutputSource() {
-      return outputSource;
+    public String getSink() {
+      return sink;
     }
   }
 
@@ -99,16 +99,16 @@ public class SamzaSqlQueryParser {
       throw new SamzaException(e);
     }
 
-    String outputSource;
+    String sink;
     String selectQuery;
-    ArrayList<String> inputSources;
+    ArrayList<String> sources;
     if (sqlNode instanceof SqlInsert) {
       SqlInsert sqlInsert = ((SqlInsert) sqlNode);
-      outputSource = sqlInsert.getTargetTable().toString();
+      sink = sqlInsert.getTargetTable().toString();
       if (sqlInsert.getSource() instanceof SqlSelect) {
         SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
         selectQuery = m.group(2);
-        inputSources = getInputsFromSelectQuery(sqlSelect);
+        sources = getSourcesFromSelectQuery(sqlSelect);
       } else {
         throw new SamzaException("Sql query is not of the expected format");
       }
@@ -116,7 +116,7 @@ public class SamzaSqlQueryParser {
       throw new SamzaException("Sql query is not of the expected format");
     }
 
-    return new QueryInfo(selectQuery, inputSources, outputSource);
+    return new QueryInfo(selectQuery, sources, sink);
   }
 
   private static Planner createPlanner() {
@@ -146,38 +146,38 @@ public class SamzaSqlQueryParser {
     return Frameworks.getPlanner(frameworkConfig);
   }
 
-  private static ArrayList<String> getInputsFromSelectQuery(SqlSelect sqlSelect) {
-    ArrayList<String> input = new ArrayList<>();
-    getInput(sqlSelect.getFrom(), input);
-    if (input.size() < 1) {
+  private static ArrayList<String> getSourcesFromSelectQuery(SqlSelect sqlSelect) {
+    ArrayList<String> sources = new ArrayList<>();
+    getSource(sqlSelect.getFrom(), sources);
+    if (sources.size() < 1) {
       throw new SamzaException("Unsupported query " + sqlSelect);
     }
 
-    return input;
+    return sources;
   }
 
-  private static void getInput(SqlNode node, ArrayList<String> inputSourceList) {
+  private static void getSource(SqlNode node, ArrayList<String> sourceList) {
     if (node instanceof SqlJoin) {
       SqlJoin joinNode = (SqlJoin) node;
-      ArrayList<String> inputsLeft = new ArrayList<>();
-      ArrayList<String> inputsRight = new ArrayList<>();
-      getInput(joinNode.getLeft(), inputsLeft);
-      getInput(joinNode.getRight(), inputsRight);
+      ArrayList<String> sourcesLeft = new ArrayList<>();
+      ArrayList<String> sourcesRight = new ArrayList<>();
+      getSource(joinNode.getLeft(), sourcesLeft);
+      getSource(joinNode.getRight(), sourcesRight);
 
-      inputSourceList.addAll(inputsLeft);
-      inputSourceList.addAll(inputsRight);
+      sourceList.addAll(sourcesLeft);
+      sourceList.addAll(sourcesRight);
     } else if (node instanceof SqlIdentifier) {
-      inputSourceList.add(node.toString());
+      sourceList.add(node.toString());
     } else if (node instanceof SqlBasicCall) {
       SqlBasicCall basicCall = ((SqlBasicCall) node);
       if (basicCall.getOperator() instanceof SqlAsOperator) {
-        getInput(basicCall.operand(0), inputSourceList);
+        getSource(basicCall.operand(0), sourceList);
       } else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) {
-        inputSourceList.addAll(getInputsFromSelectQuery(basicCall.operand(0)));
+        sourceList.addAll(getSourcesFromSelectQuery(basicCall.operand(0)));
         return;
       }
     } else if (node instanceof SqlSelect) {
-      getInput(((SqlSelect) node).getFrom(), inputSourceList);
+      getSource(((SqlSelect) node).getFrom(), sourceList);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 899ca41..6df3421 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -21,6 +21,7 @@ package org.apache.samza.sql.translator;
 
 import java.util.LinkedList;
 import java.util.List;
+
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
@@ -42,15 +43,13 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.sql.data.SamzaSqlCompositeKey;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.table.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.samza.sql.data.SamzaSqlCompositeKey.createSamzaSqlCompositeKey;
-import static org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde;
 
 
 /**
@@ -75,11 +74,11 @@ class JoinTranslator {
 
   private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
   private int joinId;
-  private SourceResolver sourceResolver;
+  private SqlIOResolver ioResolver;
 
-  JoinTranslator(int joinId, SourceResolver sourceResolver) {
+  JoinTranslator(int joinId, SqlIOResolver ioResolver) {
     this.joinId = joinId;
-    this.sourceResolver = sourceResolver;
+    this.ioResolver = ioResolver;
   }
 
   void translate(final LogicalJoin join, final TranslatorContext context) {
@@ -95,11 +94,7 @@ class JoinTranslator {
     populateStreamAndTableKeyIds(((RexCall) join.getCondition()).getOperands(), join, isTablePosOnRight, streamKeyIds,
         tableKeyIds);
 
-    JsonSerdeV2<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
-    SamzaSqlRelMessageSerde relMsgSerde =
-        (SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
-
-    Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, keySerde, relMsgSerde, join, context);
+    Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, join, context);
 
     MessageStream<SamzaSqlRelMessage> inputStream =
         isTablePosOnRight ?
@@ -117,13 +112,16 @@ class JoinTranslator {
         new SamzaSqlRelMessageJoinFunction(join.getJoinType(), isTablePosOnRight, streamKeyIds, streamFieldNames,
             tableFieldNames);
 
+    Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
+    Serde<SamzaSqlRelMessage> valueSerde = new JsonSerdeV2<>(SamzaSqlRelMessage.class);
+
     // Always re-partition the messages from the input stream by the composite key and then join the messages
     // with the table.
     MessageStream<SamzaSqlRelMessage> outputStream =
         inputStream
             .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds),
                 m -> m,
-                KVSerde.of(keySerde, relMsgSerde),
+                KVSerde.of(keySerde, valueSerde),
                 "stream_" + joinId)
             .map(KV::getValue)
             .join(table, joinFn);
@@ -249,34 +247,48 @@ class JoinTranslator {
         SqlExplainLevel.EXPPLAN_ATTRIBUTES);
   }
 
+  private SqlIOConfig resolveSourceConfig(RelNode relNode) {
+    String sourceName = String.join(".", relNode.getTable().getQualifiedName());
+    SqlIOConfig sourceConfig = ioResolver.fetchSourceInfo(sourceName);
+    if (sourceConfig == null) {
+      throw new SamzaException("Unsupported source found in join statement: " + sourceName);
+    }
+    return sourceConfig;
+  }
+
   private boolean isTable(RelNode relNode) {
     // NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of
     // stream-table-table join, the left side of the join is join output, which we always
     // assume to be a stream. The intermediate stream won't be an instance of EnumerableTableScan.
-    return relNode instanceof EnumerableTableScan &&
-        sourceResolver.isTable(String.join(".", relNode.getTable().getQualifiedName()));
+    if (relNode instanceof EnumerableTableScan) {
+      return resolveSourceConfig(relNode).getTableDescriptor().isPresent();
+    } else {
+      return false;
+    }
   }
 
-  private Table loadLocalTable(boolean isTablePosOnRight, List<Integer> tableKeyIds, Serde keySerde, Serde relMsgSerde,
-      LogicalJoin join, TranslatorContext context) {
-    MessageStream<SamzaSqlRelMessage> inputTable =
-        isTablePosOnRight ?
-            context.getMessageStream(join.getRight().getId()) : context.getMessageStream(join.getLeft().getId());
+  private Table loadLocalTable(boolean isTablePosOnRight, List<Integer> tableKeyIds, LogicalJoin join, TranslatorContext context) {
+    RelNode relNode = isTablePosOnRight ? join.getRight() : join.getLeft();
+
+    MessageStream<SamzaSqlRelMessage> relOutputStream = context.getMessageStream(relNode.getId());
+
+    SqlIOConfig sourceConfig = resolveSourceConfig(relNode);
+
+    if (!sourceConfig.getTableDescriptor().isPresent()) {
+      String errMsg = "Failed to resolve table source in join operation: node=" + relNode;
+      log.error(errMsg);
+      throw new SamzaException(errMsg);
+    }
 
     // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational
     // message as the value. Send the messages from the input stream denoted as 'table' to the created table store.
     Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
-        context.getStreamGraph()
-            .getTable(new RocksDbTableDescriptor("table_" + joinId)
-                .withSerde(KVSerde.of(keySerde, relMsgSerde)));
+        context.getStreamGraph().getTable(sourceConfig.getTableDescriptor().get());
 
-    inputTable
+    relOutputStream
         .map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m))
         .sendTo(table);
 
     return table;
   }
-
-  private void logStringAndTableJoinKeys(List<String> fieldNames, List<Integer> fieldIds) {
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index c8d55e8..eda73a7 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.sql.translator;
 
+import java.util.Optional;
+
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelShuttleImpl;
@@ -27,18 +29,23 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.samza.SamzaException;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.table.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -47,6 +54,7 @@ import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
  * It then walks the relational graph and then populates the Samza's {@link StreamGraph} accordingly.
  */
 public class QueryTranslator {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class);
 
   private final ScanTranslator scanTranslator;
   private final SamzaSqlApplicationConfig sqlConfig;
@@ -65,6 +73,7 @@ public class QueryTranslator {
     final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
     final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext);
     final RelNode node = relRoot.project();
+    final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
 
     node.accept(new RelShuttleImpl() {
       int windowId = 0;
@@ -95,8 +104,7 @@ public class QueryTranslator {
       public RelNode visit(LogicalJoin join) {
         RelNode node = super.visit(join);
         joinId++;
-        SourceResolver sourceResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getSourceResolver();
-        new JoinTranslator(joinId, sourceResolver).translate(join, context);
+        new JoinTranslator(joinId, ioResolver).translate(join, context);
         return node;
       }
 
@@ -109,12 +117,23 @@ public class QueryTranslator {
       }
     });
 
-    SqlSystemSourceConfig outputSystemConfig =
-        sqlConfig.getOutputSystemStreamConfigsBySource().get(queryInfo.getOutputSource());
-    SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(queryInfo.getOutputSource());
-    MessageStreamImpl<SamzaSqlRelMessage> stream =
-        (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId());
+    String sink = queryInfo.getSink();
+    SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sink);
+    SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(queryInfo.getSink());
+    MessageStreamImpl<SamzaSqlRelMessage> stream = (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId());
     MessageStream<KV<Object, Object>> outputStream = stream.map(samzaMsgConverter::convertToSamzaMessage);
-    outputStream.sendTo(streamGraph.getOutputStream(outputSystemConfig.getStreamName()));
+
+    Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
+    if (!tableDescriptor.isPresent()) {
+      outputStream.sendTo(streamGraph.getOutputStream(sinkConfig.getStreamName()));
+    } else {
+      Table outputTable = streamGraph.getTable(tableDescriptor.get());
+      if (outputTable == null) {
+        String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+      outputStream.sendTo(outputTable);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 13300f7..1f9ed52 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -28,7 +28,7 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 
 
 /**
@@ -38,9 +38,9 @@ import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 class ScanTranslator {
 
   private final Map<String, SamzaRelConverter> relMsgConverters;
-  private final Map<String, SqlSystemSourceConfig> systemStreamConfig;
+  private final Map<String, SqlIOConfig> systemStreamConfig;
 
-  ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlSystemSourceConfig> ssc) {
+  ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlIOConfig> ssc) {
     relMsgConverters = converters;
     this.systemStreamConfig = ssc;
   }
@@ -48,7 +48,7 @@ class ScanTranslator {
   void translate(final TableScan tableScan, final TranslatorContext context) {
     StreamGraph streamGraph = context.getStreamGraph();
     List<String> tableNameParts = tableScan.getTable().getQualifiedName();
-    String sourceName = SqlSystemSourceConfig.getSourceFromSourceParts(tableNameParts);
+    String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
 
     Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName));
     SamzaRelConverter converter = relMsgConverters.get(sourceName);

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
index 3365923..de0ecf1 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
@@ -28,7 +28,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
@@ -313,10 +313,10 @@ public class TestQueryTranslator {
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableInnerJoinWithMissingStream() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
-    String configSourceResolverDomain =
+    String configIOResolverDomain =
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
-    config.put(configSourceResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
-        ConfigBasedSourceResolverFactory.class.getName());
+    config.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedIOResolverFactory.class.getName());
     String sql =
         "Insert into testavro.enrichedPageViewTopic"
             + " select p.name as profileName, pv.pageKey"

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
index 0804a6d..0d48c56 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.junit.Assert;
@@ -62,18 +62,18 @@ public class TestSamzaSqlApplicationConfig {
     // Pass
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1");
     new SamzaSqlApplicationConfig(new MapConfig(config));
-    testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER);
+    testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_IO_RESOLVER);
     testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_UDF_RESOLVER);
 
-    String configSourceResolverDomain =
+    String configIOResolverDomain =
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
-    String avroSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", "testavro");
+    String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", "testavro");
 
-    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER);
+    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER);
 
     // Configs for the unused system "log" is not mandatory.
-    String logSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", "log");
-    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER);
+    String logSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", "log");
+    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER);
   }
 
   private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) {

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
index 0bfd721..24faf4b 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
@@ -31,10 +31,10 @@ public class TestSamzaSqlQueryParser {
   @Test
   public void testParseQuery() {
     QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar");
-    Assert.assertEquals("log.foo", queryInfo.getOutputSource());
+    Assert.assertEquals("log.foo", queryInfo.getSink());
     Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery());
-    Assert.assertEquals(1, queryInfo.getInputSources().size());
-    Assert.assertEquals("tracking.bar", queryInfo.getInputSources().get(0));
+    Assert.assertEquals(1, queryInfo.getSources().size());
+    Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
   }
 
   @Test
@@ -46,10 +46,10 @@ public class TestSamzaSqlQueryParser {
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
-    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getOutputSource());
-    Assert.assertEquals(2, queryInfo.getInputSources().size());
-    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getInputSources().get(0));
-    Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getInputSources().get(1));
+    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
+    Assert.assertEquals(2, queryInfo.getSources().size());
+    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
+    Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
new file mode 100644
index 0000000..c4cacbd
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
@@ -0,0 +1,69 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.e2e;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.testutil.TestIOResolverFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlTable {
+  @Test
+  public void testEndToEnd() throws Exception {
+    int numMessages = 20;
+
+    TestIOResolverFactory.TestTable.records.clear();
+
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+    String sql1 = "Insert into testDb.testTable.`$table` select id, name from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size());
+  }
+
+  @Test
+  public void testEndToEndWithKey() throws Exception {
+    int numMessages = 20;
+
+    TestIOResolverFactory.TestTable.records.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+    String sql1 = "Insert into testDb.testTable.`$table` select id __key__, name from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 208625d..14e2243 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -37,9 +37,9 @@ import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.sql.fn.FlattenUdf;
 import org.apache.samza.sql.fn.RegexMatchUdf;
-import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
 import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
@@ -52,6 +52,7 @@ import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 public class SamzaSqlTestConfig {
 
   public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
+  public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
 
   public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
     return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
@@ -76,11 +77,11 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
     staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
 
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER, "config");
-    String configSourceResolverDomain =
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
+    String configIOResolverDomain =
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
-    staticConfigs.put(configSourceResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
-        TestSourceResolverFactory.class.getName());
+    staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        TestIOResolverFactory.class.getName());
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
     String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
@@ -91,8 +92,8 @@ public class SamzaSqlTestConfig {
             MyTestArrayUdf.class.getName()));
 
     String avroSystemConfigPrefix =
-        String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO);
-    String avroSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO);
+        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO);
+    String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO);
     staticConfigs.put(avroSystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
     staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
         String.valueOf(numberOfMessages));
@@ -101,14 +102,23 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
         String.valueOf(windowDurationMs / 2));
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
-    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "avro");
-    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String testDbSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_DB);
+    staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
 
     String avroSamzaToRelMsgConverterDomain =
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
     staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
         AvroRelConverterFactory.class.getName());
 
+    String testDbSamzaToRelMsgConverterDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, TestIOResolverFactory.TEST_DB_SYSTEM);
+    staticConfigs.put(testDbSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        AvroRelConverterFactory.class.getName());
+
     String configAvroRelSchemaProviderDomain =
         String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config");
     staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
@@ -141,6 +151,10 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
         "testavro", "pageViewCountTopic"), PageViewCount.SCHEMA$.toString());
 
+    staticConfigs.put(
+        configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+            TestIOResolverFactory.TEST_DB_SYSTEM, "testTable"), SimpleRecord.SCHEMA$.toString());
+
     staticConfigs.putAll(props);
 
     return staticConfigs;

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
new file mode 100644
index 0000000..bbe2a7e
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -0,0 +1,195 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
+
+
+public class TestIOResolverFactory implements SqlIOResolverFactory {
+  public static final String TEST_DB_SYSTEM = "testDb";
+  public static final String TEST_TABLE_ID = "testDbId";
+
+  @Override
+  public SqlIOResolver create(Config config) {
+    return new TestIOResolver(config);
+  }
+
+  static class TestTableDescriptor extends BaseTableDescriptor {
+    protected TestTableDescriptor(String tableId) {
+      super(tableId);
+    }
+
+    @Override
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public TableSpec getTableSpec() {
+      return new TableSpec(tableId, KVSerde.of(new NoOpSerde(), new NoOpSerde()), TestTableProviderFactory.class.getName(), new HashMap<>());
+    }
+  }
+
+  public static class TestTable implements ReadWriteTable {
+    public static Map<Object, Object> records = new HashMap<>();
+    @Override
+    public Object get(Object key) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public Map getAll(List keys) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void put(Object key, Object value) {
+      if (key == null) {
+        records.put(System.nanoTime(), value);
+      } else {
+        records.put(key, value);
+      }
+    }
+
+    @Override
+    public void delete(Object key) {
+      records.remove(key);
+    }
+
+    @Override
+    public void deleteAll(List keys) {
+      records.clear();
+    }
+
+    @Override
+    public void flush() {
+    }
+
+    @Override
+    public void putAll(List entries) {
+      throw new NotImplementedException();
+    }
+  }
+
+  public static class TestTableProviderFactory implements TableProviderFactory {
+    @Override
+    public TableProvider getTableProvider(TableSpec tableSpec) {
+      return new TestTableProvider();
+    }
+  }
+
+  static class TestTableProvider implements TableProvider {
+    @Override
+    public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+    }
+
+    @Override
+    public Table getTable() {
+      return new TestTable();
+    }
+
+    @Override
+    public Map<String, String> generateConfig(Map<String, String> config) {
+      return new HashMap<>();
+    }
+
+    @Override
+    public void close() {
+    }
+  }
+
+  private class TestIOResolver implements SqlIOResolver {
+    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
+    private final Config config;
+    private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
+
+    public TestIOResolver(Config config) {
+      this.config = config;
+    }
+
+    private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
+      String[] sourceComponents = ioName.split("\\.");
+      int systemIdx = 0;
+      int endIdx = sourceComponents.length - 1;
+      int streamIdx = endIdx;
+      TableDescriptor tableDescriptor = null;
+
+      if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+        streamIdx = endIdx - 1;
+
+        tableDescriptor = tableDescMap.get(ioName);
+
+        if (tableDescriptor == null) {
+          if (isSink) {
+            tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size());
+          } else {
+            tableDescriptor = new RocksDbTableDescriptor("InputTable-" + ioName)
+                .withSerde(KVSerde.of(
+                    new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
+                    new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+          }
+          tableDescMap.put(ioName, tableDescriptor);
+        }
+      }
+
+      Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
+      return new SqlIOConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
+          Arrays.asList(sourceComponents), systemConfigs, tableDescriptor);
+    }
+
+    @Override
+    public SqlIOConfig fetchSourceInfo(String sourceName) {
+      return fetchIOInfo(sourceName, false);
+    }
+
+    @Override
+    public SqlIOConfig fetchSinkInfo(String sinkName) {
+      return fetchIOInfo(sinkName, true);
+    }
+  }
+}