You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/09/20 21:22:51 UTC

[1/2] samza git commit: Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input

Repository: samza
Updated Branches:
  refs/heads/master db6996ed9 -> dec16392d


http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index c9f59e6..e5d3659 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -22,7 +22,9 @@ package org.apache.samza.sql.translator;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
@@ -43,6 +45,9 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
+import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*;
+
+
 public class TestQueryTranslator {
 
   // Helper functions to validate the cloned copies of TranslatorContext and SamzaSqlExecutionContext
@@ -79,14 +84,20 @@ public class TestQueryTranslator {
   public void testTranslate() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10");
+        "Insert into testavro.outputTopic(id) select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(descriptor -> { },samzaConfig);
+    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
 
-    translator.translate(queryInfo, appDesc);
+    translator.translate(queryInfo.get(0), appDesc);
     OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -128,17 +139,20 @@ public class TestQueryTranslator {
   public void testTranslateComplex() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select Flatten(array_values) from testavro.COMPLEX1");
-//    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-//        "Insert into testavro.foo2 select string_value, SUM(id) from testavro.COMPLEX1 "
-//            + "GROUP BY TumbleWindow(CURRENT_TIME, INTERVAL '1' HOUR), string_value");
+        "Insert into testavro.outputTopic(string_value) select Flatten(array_values) from testavro.COMPLEX1");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -163,14 +177,21 @@ public class TestQueryTranslator {
   public void testTranslateSubQuery() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select Flatten(a), id from (select id, array_values a, string_value s from testavro.COMPLEX1)");
+        "Insert into testavro.outputTopic(string_value, id) select Flatten(a), id "
+            + " from (select id, array_values a, string_value s from testavro.COMPLEX1)");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -195,115 +216,151 @@ public class TestQueryTranslator {
   public void testTranslateStreamTableJoinWithoutJoinOperator() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
             + " where p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableJoinWithFullJoinOperator() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " full join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = IllegalStateException.class)
   public void testTranslateStreamTableJoinWithSelfJoinOperator() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName)"
             + " select p1.name as profileName"
             + " from testavro.PROFILE.`$table` as p1"
             + " join testavro.PROFILE.`$table` as p2"
             + " on p1.id = p2.id";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableJoinWithThetaCondition() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id <> pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableCrossJoin() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableJoinWithAndLiteralCondition() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId and p.name = 'John'";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableJoinWithSubQuery() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " where exists "
@@ -311,83 +368,113 @@ public class TestQueryTranslator {
             + " where p.id = pv.profileId)";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateTableTableJoin() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW.`$table` as pv"
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamStreamJoin() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.PROFILE as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateJoinWithIncorrectLeftJoin() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW.`$table` as pv"
             + " left join testavro.PROFILE as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateJoinWithIncorrectRightJoin() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " right join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -398,55 +485,73 @@ public class TestQueryTranslator {
     config.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
         ConfigBasedIOResolverFactory.class.getName());
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableInnerJoinWithUdf() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.PROFILE.`$table` as p"
             + " on MyTest(p.id) = MyTest(pv.profileId)";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test
   public void testTranslateStreamTableInnerJoin() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -487,19 +592,25 @@ public class TestQueryTranslator {
   public void testTranslateStreamTableLeftJoin() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " left join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
 
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
@@ -541,18 +652,24 @@ public class TestQueryTranslator {
   public void testTranslateStreamTableRightJoin() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PROFILE.`$table` as p"
             + " right join testavro.PAGEVIEW as pv"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
 
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
@@ -594,19 +711,25 @@ public class TestQueryTranslator {
   public void testTranslateGroupBy() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.pageViewCountTopic"
+        "Insert into testavro.pageViewCountTopic(jobName, pageKey, `count`)"
             + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"
             + " from testavro.PAGEVIEW as pv"
             + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
             + " group by (pv.pageKey)";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     Assert.assertEquals(1, specGraph.getInputOperators().size());
@@ -619,16 +742,22 @@ public class TestQueryTranslator {
   public void testTranslateGroupByWithSumAggregator() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.pageViewCountTopic"
+        "Insert into testavro.pageViewCountTopic(jobName, pageKey, `sum`)"
             + " select 'SampleJob' as jobName, pv.pageKey, sum(pv.profileId) as `sum`"
             + " from testavro.PAGEVIEW as pv" + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
             + " group by (pv.pageKey)";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index c211f03..919c91a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -75,7 +75,28 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
 
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) + MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1";
+    String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
+  }
+
+  @Test
+  public void testEndToEndWithProjection() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+        + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) + MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
@@ -97,9 +118,9 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
 
     LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
     String sql1 =
-        "Insert into testavro.outputTopic "
-            + "select Flatten(array_values) as string_value, id, bytes_value, fixed_value "
-            + "from testavro.COMPLEX1";
+        "Insert into testavro.outputTopic(string_value, id, bytes_value, fixed_value, float_value) "
+            + " select Flatten(array_values) as string_value, id, bytes_value, fixed_value, float_value "
+            + " from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
@@ -121,7 +142,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql1 =
-        "Insert into testavro.outputTopic select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)";
+        "Insert into testavro.outputTopic(id) select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
@@ -142,7 +163,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1";
+    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+        + "select id, MyTest(id) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
@@ -166,7 +188,11 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id from testavro.SIMPLE1 where RegexMatch('.*4', Name)";
+    String sql1 =
+        "Insert into testavro.outputTopic(id) "
+            + "select id "
+            + "from testavro.SIMPLE1 "
+            + "where RegexMatch('.*4', name)";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
@@ -186,7 +212,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     staticConfigs.putAll(configs);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
             + "from testavro.PROFILE.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
             + " on p.id = pv.profileId";
@@ -215,7 +242,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     staticConfigs.putAll(configs);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName, p.address as profileAddress "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PROFILE.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
             + " on p.id = pv.profileId";
@@ -249,7 +277,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     staticConfigs.putAll(configs);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PROFILE.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
             + " on p.id = pv.profileId "
@@ -282,7 +311,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PAGEVIEW as pv "
             + "join testavro.PROFILE.`$table` as p "
             + " on pv.profileId = p.id";
@@ -311,7 +341,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PAGEVIEW as pv "
             + "left join testavro.PROFILE.`$table` as p "
             + " on pv.profileId = p.id";
@@ -340,7 +371,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PROFILE.`$table` as p "
             + "right join testavro.PAGEVIEW as pv "
             + " on p.id = pv.profileId";
@@ -369,7 +401,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName, c.name as companyName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PAGEVIEW as pv "
             + "join testavro.PROFILE.`$table` as p "
             + " on p.id = pv.profileId "
@@ -399,7 +432,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName, c.name as companyName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PAGEVIEW as pv "
             + "join testavro.PROFILE.`$table` as p "
             + " on p.id = pv.profileId "


[2/2] samza git commit: Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input

Posted by sr...@apache.org.
Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input

This PR has the following changes:
- Let QueryTranslator take Calcite IR as input
- Include 'INSERT INTO' sql statement for Calcite plan
- Basic DSLConverter Framework with SamzaSQL dialect as an example
- Some fixes to stream-table join wrt Serde

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Srinivasulu <sp...@linkedin.com>, Weiqing <wi...@linkedin.com>

Closes #630 from atoomula/dsl3 and squashes the following commits:

93c66cee [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
21c0175b [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
15a1e9fb [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
5bf0c7e1 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
98cd9777 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
63a66fb1 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
6794b512 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
c9d434a9 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
94e53b64 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
30c76ed9 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dec16392
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dec16392
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dec16392

Branch: refs/heads/master
Commit: dec16392de2f5d323b6b1b3acf8de1689038f44d
Parents: db6996e
Author: Aditya Toomula <at...@linkedin.com>
Authored: Thu Sep 20 14:22:38 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Thu Sep 20 14:22:38 2018 -0700

----------------------------------------------------------------------
 .../samza/sql/data/RexToJavaCompiler.java       |   5 +-
 .../samza/sql/dsl/SamzaSqlDslConverter.java     |  96 ++++++
 .../sql/dsl/SamzaSqlDslConverterFactory.java    |  33 ++
 .../samza/sql/interfaces/DslConverter.java      |  37 ++
 .../sql/interfaces/DslConverterFactory.java     |  36 ++
 .../samza/sql/interfaces/SamzaSqlDriver.java    |  56 +++
 .../interfaces/SamzaSqlJavaTypeFactoryImpl.java |  72 ++++
 .../samza/sql/runner/SamzaSqlApplication.java   |  30 +-
 .../sql/runner/SamzaSqlApplicationConfig.java   | 117 ++++---
 .../sql/runner/SamzaSqlApplicationRunner.java   |  41 ++-
 .../samza/sql/testutil/SamzaSqlQueryParser.java |  21 +-
 .../samza/sql/translator/JoinTranslator.java    |   1 +
 .../samza/sql/translator/ModifyTranslator.java  | 117 +++++++
 .../samza/sql/translator/QueryTranslator.java   |  90 ++---
 .../samza/sql/translator/ScanTranslator.java    |  10 +-
 .../apache/samza/sql/e2e/TestSamzaSqlTable.java |   4 +-
 .../runner/TestSamzaSqlApplicationConfig.java   |  49 ++-
 .../runner/TestSamzaSqlApplicationRunner.java   |   2 +-
 .../samza/sql/system/TestAvroSystemFactory.java |   3 +-
 .../samza/sql/testutil/SamzaSqlTestConfig.java  |   3 +
 .../sql/testutil/TestSamzaSqlFileParser.java    |   1 +
 .../sql/translator/TestQueryTranslator.java     | 345 +++++++++++++------
 .../test/samzasql/TestSamzaSqlEndToEnd.java     |  64 +++-
 23 files changed, 948 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
index 21c81a8..1cfa95f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
@@ -49,6 +49,7 @@ import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
 import org.apache.calcite.util.Pair;
 import org.apache.samza.SamzaException;
+import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
 import org.codehaus.commons.compiler.CompileException;
 import org.codehaus.commons.compiler.CompilerFactoryFactory;
 import org.codehaus.commons.compiler.IClassBodyEvaluator;
@@ -114,11 +115,11 @@ public class RexToJavaCompiler {
     final ParameterExpression root = DataContext.ROOT;
     final ParameterExpression inputValues = Expressions.parameter(Object[].class, "inputValues");
     final ParameterExpression outputValues = Expressions.parameter(Object[].class, "outputValues");
-    final JavaTypeFactoryImpl javaTypeFactory = new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+    final JavaTypeFactoryImpl javaTypeFactory = new SamzaSqlJavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
 
     // public void execute(Object[] inputValues, Object[] outputValues)
     final RexToLixTranslator.InputGetter inputGetter = new RexToLixTranslator.InputGetterImpl(ImmutableList.of(
-        Pair.<org.apache.calcite.linq4j.tree.Expression, PhysType>of(
+        Pair.of(
             Expressions.variable(Object[].class, "inputValues"),
             PhysTypeImpl.of(javaTypeFactory, inputRowType, JavaRowFormat.ARRAY, false))));
 

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
new file mode 100644
index 0000000..4ec6f4a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.dsl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.DslConverter;
+import org.apache.samza.sql.planner.QueryPlanner;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.testutil.SqlFileParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*;
+
+
+public class SamzaSqlDslConverter implements DslConverter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlDslConverter.class);
+
+  private final Config config;
+
+  SamzaSqlDslConverter(Config config) {
+    this.config = config;
+  }
+
+  @Override
+  public Collection<RelRoot> convertDsl(String dsl) {
+    // TODO: Introduce an API to parse a dsl string and return one or more sql statements
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config,
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
+    QueryPlanner planner =
+        new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(),
+            sqlConfig.getUdfMetadata());
+
+    List<RelRoot> relRoots = new LinkedList<>();
+    for (String sql: sqlStmts) {
+      relRoots.add(planner.plan(sql));
+    }
+    return relRoots;
+  }
+
+  public static List<SamzaSqlQueryParser.QueryInfo> fetchQueryInfo(List<String> sqlStmts) {
+    return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
+  }
+
+  public static List<String> fetchSqlFromConfig(Map<String, String> config) {
+    List<String> sql;
+    if (config.containsKey(CFG_SQL_STMT) && StringUtils.isNotBlank(config.get(CFG_SQL_STMT))) {
+      String sqlValue = config.get(CFG_SQL_STMT);
+      sql = Collections.singletonList(sqlValue);
+    } else if (config.containsKey(CFG_SQL_STMTS_JSON) && StringUtils.isNotBlank(config.get(CFG_SQL_STMTS_JSON))) {
+      sql = deserializeSqlStmts(config.get(CFG_SQL_STMTS_JSON));
+    } else if (config.containsKey(CFG_SQL_FILE)) {
+      String sqlFile = config.get(CFG_SQL_FILE);
+      sql = SqlFileParser.parseSqlFile(sqlFile);
+    } else {
+      String msg = "Config doesn't contain the SQL that needs to be executed.";
+      LOG.error(msg);
+      throw new SamzaException(msg);
+    }
+
+    return sql;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java
new file mode 100644
index 0000000..5176453
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.dsl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.DslConverter;
+import org.apache.samza.sql.interfaces.DslConverterFactory;
+
+
+public class SamzaSqlDslConverterFactory implements DslConverterFactory {
+
+  @Override
+  public DslConverter create(Config config) {
+    return new SamzaSqlDslConverter(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
new file mode 100644
index 0000000..fc2ca8e
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import java.util.Collection;
+import org.apache.calcite.rel.RelRoot;
+
+
+/**
+ * Samza SQL Application uses {@link DslConverter} to convert the input dsl to Calcite logical plan.
+ */
+public interface DslConverter {
+
+  /**
+   * Convert the dsl into the Calcite logical plan.
+   * @return List of Root nodes of the Calcite logical plan.
+   * If DSL represents multiple SQL statements. You might return root nodes one for each SQL statement.
+   */
+  Collection<RelRoot> convertDsl(String dsl);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java
new file mode 100644
index 0000000..d42a96f
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory that is used to create {@link DslConverter}
+ */
+public interface DslConverterFactory {
+
+  /**
+   * Create a {@link DslConverter} given the config
+   * @param config config needed to create the {@link DslConverter}
+   * @return {@link DslConverter} object created.
+   */
+  DslConverter create(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java
new file mode 100644
index 0000000..5c86df9
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.sql.interfaces;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.ConnectStringParser;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.jdbc.CalciteFactory;
+import org.apache.calcite.jdbc.Driver;
+
+
+/**
+ * Calcite JDBC driver for SamzaSQL which takes in a {@link JavaTypeFactory}
+ */
+public class SamzaSqlDriver extends Driver {
+
+  private JavaTypeFactory typeFactory;
+
+  public SamzaSqlDriver(JavaTypeFactory typeFactory) {
+    this.typeFactory = typeFactory;
+  }
+
+  @Override
+  public Connection connect(String url, Properties info) throws SQLException {
+    if (!acceptsURL(url)) {
+      return null;
+    }
+    final String prefix = getConnectStringPrefix();
+    assert url.startsWith(prefix);
+    final String urlSuffix = url.substring(prefix.length());
+    final Properties info2 = ConnectStringParser.parse(urlSuffix, info);
+    final AvaticaConnection connection =
+        ((CalciteFactory) factory).newConnection(this, factory, url, info2, null, typeFactory);
+    handler.onConnectionInit(connection);
+    return connection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
new file mode 100644
index 0000000..50001c6
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.sql.interfaces;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.JavaToSqlTypeConversionRules;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+
+/**
+ * Calcite does validation of projected field types in select statement with the output schema types. If one of the
+ * projected fields is an UDF with return type of {@link Object} or any other java type not defined in
+ * {@link JavaToSqlTypeConversionRules}, using the default {@link JavaTypeFactoryImpl} results in validation failure.
+ * Hence, extending {@link JavaTypeFactoryImpl} to make Calcite validation work with all output types of Samza SQL UDFs.
+ */
+public class SamzaSqlJavaTypeFactoryImpl
+    extends JavaTypeFactoryImpl {
+
+  public SamzaSqlJavaTypeFactoryImpl() {
+    this(RelDataTypeSystem.DEFAULT);
+  }
+
+  public SamzaSqlJavaTypeFactoryImpl(RelDataTypeSystem typeSystem) {
+    super(typeSystem);
+  }
+
+  @Override
+  public RelDataType toSql(RelDataType type) {
+    return toSql(this, type);
+  }
+
+  /** Converts a type in Java format to a SQL-oriented type. */
+  public static RelDataType toSql(final RelDataTypeFactory typeFactory,
+      RelDataType type) {
+    if (type instanceof RelRecordType) {
+      return typeFactory.createStructType(
+          Lists.transform(type.getFieldList(), a0 -> toSql(typeFactory, a0.getType())),
+          type.getFieldNames());
+    }
+    if (type instanceof JavaType) {
+      SqlTypeName typeName = JavaToSqlTypeConversionRules.instance().lookup(((JavaType) type).getJavaClass());
+      // For unknown sql type names, return ANY sql type to make Calcite validation not fail.
+      if (typeName == null) {
+        typeName = SqlTypeName.ANY;
+      }
+      return typeFactory.createTypeWithNullability(
+          typeFactory.createSqlType(typeName),
+          type.isNullable());
+    } else {
+      return JavaTypeFactoryImpl.toSql(typeFactory, type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index 9a871d7..fd1a2a8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -19,10 +19,14 @@
 
 package org.apache.samza.sql.runner;
 
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import org.apache.calcite.rel.RelRoot;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
 import org.apache.samza.sql.translator.QueryTranslator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,12 +42,26 @@ public class SamzaSqlApplication implements StreamApplication {
   @Override
   public void describe(StreamApplicationDescriptor appDesc) {
     try {
-      SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(appDesc.getConfig());
+      // TODO: Introduce an API to return a dsl string containing one or more sql statements.
+      List<String> dslStmts = SamzaSqlDslConverter.fetchSqlFromConfig(appDesc.getConfig());
+
+      // 1. Get Calcite plan
+      Set<String> inputSystemStreams = new HashSet<>();
+      Set<String> outputSystemStreams = new HashSet<>();
+
+      Collection<RelRoot> relRoots =
+          SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, appDesc.getConfig(),
+              inputSystemStreams, outputSystemStreams);
+
+      // 2. Populate configs
+      SamzaSqlApplicationConfig sqlConfig =
+          new SamzaSqlApplicationConfig(appDesc.getConfig(), inputSystemStreams, outputSystemStreams);
+
+      // 3. Translate Calcite plan to Samza stream operators
       QueryTranslator queryTranslator = new QueryTranslator(sqlConfig);
-      List<SamzaSqlQueryParser.QueryInfo> queries = sqlConfig.getQueryInfo();
-      for (SamzaSqlQueryParser.QueryInfo query : queries) {
-        LOG.info("Translating the query {} to samza stream graph", query.getSelectQuery());
-        queryTranslator.translate(query, appDesc);
+      for (RelRoot relRoot : relRoots) {
+        LOG.info("Translating relRoot {} to samza stream graph", relRoot);
+        queryTranslator.translate(relRoot, appDesc);
       }
     } catch (RuntimeException e) {
       LOG.error("SamzaSqlApplication threw exception.", e);

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 997312f..415cfdd 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -20,7 +20,6 @@
 package org.apache.samza.sql.runner;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -30,12 +29,18 @@ import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.core.TableModify;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.Validate;
-import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.DslConverter;
+import org.apache.samza.sql.interfaces.DslConverterFactory;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
@@ -47,9 +52,6 @@ import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.apache.samza.sql.interfaces.UdfResolver;
 import org.apache.samza.sql.testutil.JsonUtil;
 import org.apache.samza.sql.testutil.ReflectionUtils;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
-import org.apache.samza.sql.testutil.SqlFileParser;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,37 +94,25 @@ public class SamzaSqlApplicationConfig {
 
   private final Map<String, SqlIOConfig> inputSystemStreamConfigBySource;
   private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource;
-
-  private final List<String> sql;
-
-  private final List<QueryInfo> queryInfo;
+  private final Map<String, SqlIOConfig> systemStreamConfigsBySource;
 
   private final long windowDurationMs;
 
-  public SamzaSqlApplicationConfig(Config staticConfig) {
-
-    sql = fetchSqlFromConfig(staticConfig);
-
-    queryInfo = fetchQueryInfo(sql);
+  public SamzaSqlApplicationConfig(Config staticConfig, Set<String> inputSystemStreams,
+      Set<String> outputSystemStreams) {
 
     ioResolver = createIOResolver(staticConfig);
 
-    udfResolver = createUdfResolver(staticConfig);
-    udfMetadata = udfResolver.getUdfs();
+    inputSystemStreamConfigBySource = inputSystemStreams.stream()
+         .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src)));
 
-    inputSystemStreamConfigBySource = queryInfo.stream()
-        .map(QueryInfo::getSources)
-        .flatMap(Collection::stream)
-        .distinct()
-        .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSourceInfo));
+    outputSystemStreamConfigsBySource = outputSystemStreams.stream()
+         .collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x)));
 
-    Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
+    systemStreamConfigsBySource = new HashMap<>(inputSystemStreamConfigBySource);
+    systemStreamConfigsBySource.putAll(outputSystemStreamConfigsBySource);
 
-    outputSystemStreamConfigsBySource = queryInfo.stream()
-        .map(QueryInfo::getSink)
-        .distinct()
-        .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSinkInfo));
-    systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
+    Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(systemStreamConfigsBySource.values());
 
     relSchemaProvidersBySource = systemStreamConfigs.stream()
         .collect(Collectors.toMap(SqlIOConfig::getSource,
@@ -136,6 +126,9 @@ public class SamzaSqlApplicationConfig {
                 CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
                     relSchemaProvidersBySource.get(x.getSource()), c))));
 
+    udfResolver = createUdfResolver(staticConfig);
+    udfMetadata = udfResolver.getUdfs();
+
     windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
   }
 
@@ -151,30 +144,7 @@ public class SamzaSqlApplicationConfig {
     return factoryInvoker.apply(factory, pluginConfig);
   }
 
-  public static List<QueryInfo> fetchQueryInfo(List<String> sqlStmts) {
-    return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
-  }
-
-  public static List<String> fetchSqlFromConfig(Map<String, String> config) {
-    List<String> sql;
-    if (config.containsKey(CFG_SQL_STMT) && StringUtils.isNotBlank(config.get(CFG_SQL_STMT))) {
-      String sqlValue = config.get(CFG_SQL_STMT);
-      sql = Collections.singletonList(sqlValue);
-    } else if (config.containsKey(CFG_SQL_STMTS_JSON) && StringUtils.isNotBlank(config.get(CFG_SQL_STMTS_JSON))) {
-      sql = deserializeSqlStmts(config.get(CFG_SQL_STMTS_JSON));
-    } else if (config.containsKey(CFG_SQL_FILE)) {
-      String sqlFile = config.get(CFG_SQL_FILE);
-      sql = SqlFileParser.parseSqlFile(sqlFile);
-    } else {
-      String msg = "Config doesn't contain the SQL that needs to be executed.";
-      LOG.error(msg);
-      throw new SamzaException(msg);
-    }
-
-    return sql;
-  }
-
-  private static List<String> deserializeSqlStmts(String value) {
+  public static List<String> deserializeSqlStmts(String value) {
     Validate.notEmpty(value, "json Value is not set or empty");
     return JsonUtil.fromJson(value, new TypeReference<List<String>>() {
     });
@@ -224,12 +194,45 @@ public class SamzaSqlApplicationConfig {
     return ret;
   }
 
-  public List<String> getSql() {
-    return sql;
+  public static Collection<RelRoot> populateSystemStreamsAndGetRelRoots(List<String> dslStmts, Config config,
+      Set<String> inputSystemStreams, Set<String> outputSystemStreams) {
+    // TODO: Get the converter factory based on the file type. Create abstraction around this.
+    DslConverterFactory dslConverterFactory = new SamzaSqlDslConverterFactory();
+    DslConverter dslConverter = dslConverterFactory.create(config);
+
+    Collection<RelRoot> relRoots = dslConverter.convertDsl(String.join("\n", dslStmts));
+
+    for (RelRoot relRoot : relRoots) {
+      SamzaSqlApplicationConfig.populateSystemStreams(relRoot.project(), inputSystemStreams, outputSystemStreams);
+    }
+
+    return relRoots;
+  }
+
+  private static void populateSystemStreams(RelNode relNode, Set<String> inputSystemStreams,
+      Set<String> outputSystemStreams) {
+    if (relNode instanceof TableModify) {
+      outputSystemStreams.add(getSystemStreamName(relNode));
+    } else {
+      if (relNode instanceof BiRel) {
+        BiRel biRelNode = (BiRel) relNode;
+        populateSystemStreams(biRelNode.getLeft(), inputSystemStreams, outputSystemStreams);
+        populateSystemStreams(biRelNode.getRight(), inputSystemStreams, outputSystemStreams);
+      } else {
+        if (relNode.getTable() != null) {
+          inputSystemStreams.add(getSystemStreamName(relNode));
+        }
+      }
+    }
+     List<RelNode> relNodes = relNode.getInputs();
+    if (relNodes == null || relNodes.isEmpty()) {
+      return;
+    }
+    relNodes.forEach(node -> populateSystemStreams(node, inputSystemStreams, outputSystemStreams));
   }
 
-  public List<QueryInfo> getQueryInfo() {
-    return queryInfo;
+  private static String getSystemStreamName(RelNode relNode) {
+    return relNode.getTable().getQualifiedName().stream().map(Object::toString).collect(Collectors.joining("."));
   }
 
   public Collection<UdfMetadata> getUdfMetadata() {
@@ -244,6 +247,10 @@ public class SamzaSqlApplicationConfig {
     return outputSystemStreamConfigsBySource;
   }
 
+  public Map<String, SqlIOConfig> getSystemStreamConfigsBySource() {
+    return systemStreamConfigsBySource;
+  }
+
   public Map<String, SamzaRelConverter> getSamzaRelConverters() {
     return samzaRelConvertersBySource;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 027fd23..cad032f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -21,20 +21,21 @@ package org.apache.samza.sql.runner;
 
 import java.time.Duration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang3.Validate;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.ApplicationStatus;
-import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.RemoteApplicationRunner;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,25 +64,31 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
   public static Config computeSamzaConfigs(Boolean localRunner, Config config) {
     Map<String, String> newConfig = new HashMap<>();
 
-    SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(config);
-    // Parse the sql and find the input stream streams
-    List<String> sqlStmts = SamzaSqlApplicationConfig.fetchSqlFromConfig(config);
+    // TODO: Introduce an API to return a dsl string containing one or more sql statements
+    List<String> dslStmts = SamzaSqlDslConverter.fetchSqlFromConfig(config);
 
     // This is needed because the SQL file may not be available in all the node managers.
-    String sqlJson = SamzaSqlApplicationConfig.serializeSqlStmts(sqlStmts);
+    String sqlJson = SamzaSqlApplicationConfig.serializeSqlStmts(dslStmts);
     newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, sqlJson);
 
-    List<SamzaSqlQueryParser.QueryInfo> queryInfo = SamzaSqlApplicationConfig.fetchQueryInfo(sqlStmts);
-    for (SamzaSqlQueryParser.QueryInfo query : queryInfo) {
-      // Populate stream to system mapping config for input and output system streams
-      for (String inputSource : query.getSources()) {
-        SqlIOConfig inputSystemStreamConfig = ioResolver.fetchSourceInfo(inputSource);
-        newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
-            inputSystemStreamConfig.getSystemName());
-        newConfig.putAll(inputSystemStreamConfig.getConfig());
-      }
-
-      SqlIOConfig outputSystemStreamConfig = ioResolver.fetchSinkInfo(query.getSink());
+    Set<String> inputSystemStreams = new HashSet<>();
+    Set<String> outputSystemStreams = new HashSet<>();
+
+    SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, config,
+        inputSystemStreams, outputSystemStreams);
+
+    SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(config);
+
+    // Populate stream to system mapping config for input and output system streams
+    for (String source : inputSystemStreams) {
+      SqlIOConfig inputSystemStreamConfig = ioResolver.fetchSourceInfo(source);
+      newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
+          inputSystemStreamConfig.getSystemName());
+      newConfig.putAll(inputSystemStreamConfig.getConfig());
+    }
+
+    for (String sink : outputSystemStreams) {
+      SqlIOConfig outputSystemStreamConfig = ioResolver.fetchSinkInfo(sink);
       newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()),
           outputSystemStreamConfig.getSystemName());
       newConfig.putAll(outputSystemStreamConfig.getConfig());

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
index 39ea092..643c82f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
@@ -24,9 +24,11 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.plan.Contexts;
@@ -49,6 +51,8 @@ import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
 import org.apache.samza.SamzaException;
+import org.apache.samza.sql.interfaces.SamzaSqlDriver;
+import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
 
 
 /**
@@ -63,11 +67,13 @@ public class SamzaSqlQueryParser {
     private final List<String> sources;
     private String selectQuery;
     private String sink;
+    private String sql;
 
-    public QueryInfo(String selectQuery, List<String> sources, String sink) {
+    public QueryInfo(String selectQuery, List<String> sources, String sink, String sql) {
       this.selectQuery = selectQuery;
       this.sink = sink;
       this.sources = sources;
+      this.sql = sql;
     }
 
     public List<String> getSources() {
@@ -81,6 +87,10 @@ public class SamzaSqlQueryParser {
     public String getSink() {
       return sink;
     }
+
+    public String getSql() {
+      return sql;
+    }
   }
 
   public static QueryInfo parseQuery(String sql) {
@@ -116,14 +126,18 @@ public class SamzaSqlQueryParser {
       throw new SamzaException("Sql query is not of the expected format");
     }
 
-    return new QueryInfo(selectQuery, sources, sink);
+    return new QueryInfo(selectQuery, sources, sink, sql);
   }
 
   private static Planner createPlanner() {
     Connection connection;
     SchemaPlus rootSchema;
     try {
-      connection = DriverManager.getConnection("jdbc:calcite:");
+      JavaTypeFactory typeFactory = new SamzaSqlJavaTypeFactoryImpl();
+      SamzaSqlDriver driver = new SamzaSqlDriver(typeFactory);
+      DriverManager.deregisterDriver(DriverManager.getDriver("jdbc:calcite:"));
+      DriverManager.registerDriver(driver);
+      connection = driver.connect("jdbc:calcite:", new Properties());
       CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
       rootSchema = calciteConnection.getRootSchema();
     } catch (SQLException e) {
@@ -174,7 +188,6 @@ public class SamzaSqlQueryParser {
         getSource(basicCall.operand(0), sourceList);
       } else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) {
         sourceList.addAll(getSourcesFromSelectQuery(basicCall.operand(0)));
-        return;
       }
     } else if (node instanceof SqlSelect) {
       getSource(((SqlSelect) node).getFrom(), sourceList);

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 7071b39..ac2c64d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -127,6 +127,7 @@ class JoinTranslator {
                 "stream_" + joinId)
             .map(KV::getValue)
             .join(table, joinFn);
+    // MessageStream<SamzaSqlRelMessage> outputStream = inputStream.join(table, joinFn);
 
     context.registerMessageStream(join.getId(), outputStream);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
new file mode 100644
index 0000000..965338f
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
@@ -0,0 +1,117 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.table.Table;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * Translator to translate the TableModify in relational graph to the corresponding output streams in the StreamGraph
+ * implementation
+ */
+class ModifyTranslator {
+
+  private final Map<String, SamzaRelConverter> relMsgConverters;
+  private final Map<String, SqlIOConfig> systemStreamConfig;
+
+  ModifyTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlIOConfig> ssc) {
+    relMsgConverters = converters;
+    this.systemStreamConfig = ssc;
+  }
+
+  // OutputMapFunction converts SamzaSqlRelMessage to SamzaMessage in KV format
+  private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
+    // All the user-supplied functions are expected to be serializable in order to enable full serialization of user
+    // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator
+    // initialization.
+    private transient SamzaRelConverter samzaMsgConverter;
+    private final String outputTopic;
+
+    OutputMapFunction(String outputTopic) {
+      this.outputTopic = outputTopic;
+    }
+
+    @Override
+    public void init(Config config, TaskContext taskContext) {
+      TranslatorContext context = (TranslatorContext) taskContext.getUserContext();
+      this.samzaMsgConverter = context.getMsgConverter(outputTopic);
+    }
+
+    @Override
+    public KV<Object, Object> apply(SamzaSqlRelMessage message) {
+      return this.samzaMsgConverter.convertToSamzaMessage(message);
+    }
+  }
+
+  void translate(final TableModify tableModify, final TranslatorContext context) {
+    StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
+    List<String> tableNameParts = tableModify.getTable().getQualifiedName();
+    String targetName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
+
+    Validate.isTrue(relMsgConverters.containsKey(targetName), String.format("Unknown source %s", targetName));
+
+    SqlIOConfig sinkConfig = systemStreamConfig.get(targetName);
+
+    final String systemName = sinkConfig.getSystemName();
+    final String streamName = sinkConfig.getStreamName();
+
+    KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
+    DelegatingSystemDescriptor
+        sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
+    GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamName, noOpKVSerde);
+
+    MessageStreamImpl<SamzaSqlRelMessage> stream =
+        (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(tableModify.getInput().getId());
+    MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(targetName));
+
+    Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
+    if (!tableDescriptor.isPresent()) {
+      outputStream.sendTo(streamAppDesc.getOutputStream(osd));
+    } else {
+      Table outputTable = streamAppDesc.getTable(tableDescriptor.get());
+      if (outputTable == null) {
+        String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
+        throw new SamzaException(msg);
+      }
+      outputStream.sendTo(outputTable);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index fe4d8da..3a35b97 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -20,10 +20,10 @@
 package org.apache.samza.sql.translator;
 
 import java.util.Map;
-import java.util.Optional;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
@@ -33,27 +33,13 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.table.Table;
 import org.apache.samza.task.TaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -62,54 +48,56 @@ import org.slf4j.LoggerFactory;
  * It then walks the relational graph and then populates the Samza's {@link StreamApplicationDescriptor} accordingly.
  */
 public class QueryTranslator {
-  private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class);
-
   private final ScanTranslator scanTranslator;
+  private final ModifyTranslator modifyTranslator;
   private final SamzaSqlApplicationConfig sqlConfig;
   private final Map<String, SamzaRelConverter> converters;
 
-  private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
-    private transient SamzaRelConverter samzaMsgConverter;
-    private final String outputTopic;
-
-    OutputMapFunction(String outputTopic) {
-      this.outputTopic = outputTopic;
-    }
-
-    @Override
-    public void init(Config config, TaskContext taskContext) {
-      TranslatorContext context = (TranslatorContext) taskContext.getUserContext();
-      this.samzaMsgConverter = context.getMsgConverter(outputTopic);
-    }
-
-    @Override
-    public KV<Object, Object> apply(SamzaSqlRelMessage message) {
-      return this.samzaMsgConverter.convertToSamzaMessage(message);
-    }
-  }
-
   public QueryTranslator(SamzaSqlApplicationConfig sqlConfig) {
     this.sqlConfig = sqlConfig;
     scanTranslator =
         new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource());
+    modifyTranslator =
+        new ModifyTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getOutputSystemStreamConfigsBySource());
     this.converters = sqlConfig.getSamzaRelConverters();
   }
 
   public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc) {
     QueryPlanner planner =
-        new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
+        new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(),
             sqlConfig.getUdfMetadata());
+    final RelRoot relRoot = planner.plan(queryInfo.getSql());
+    translate(relRoot, appDesc);
+  }
+
+  public void translate(RelRoot relRoot, StreamApplicationDescriptor appDesc) {
     final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig);
-    final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
     final TranslatorContext context = new TranslatorContext(appDesc, relRoot, executionContext, this.converters);
-    final RelNode node = relRoot.project();
     final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
+    final RelNode node = relRoot.project();
 
     node.accept(new RelShuttleImpl() {
       int windowId = 0;
       int joinId = 0;
 
       @Override
+      public RelNode visit(RelNode relNode) {
+        if (relNode instanceof TableModify) {
+          return visit((TableModify) relNode);
+        }
+        return super.visit(relNode);
+      }
+
+      private RelNode visit(TableModify modify) {
+        if (!modify.isInsert()) {
+          throw new SamzaException("Not a supported operation: " + modify.toString());
+        }
+        RelNode node = super.visit(modify);
+        modifyTranslator.translate(modify, context);
+        return node;
+      }
+
+      @Override
       public RelNode visit(TableScan scan) {
         RelNode node = super.visit(scan);
         scanTranslator.translate(scan, context);
@@ -147,28 +135,6 @@ public class QueryTranslator {
       }
     });
 
-    String sink = queryInfo.getSink();
-    SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sink);
-    MessageStreamImpl<SamzaSqlRelMessage> stream = (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId());
-    MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(sink));
-
-    Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
-    if (!tableDescriptor.isPresent()) {
-      KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
-      String systemName = sinkConfig.getSystemName();
-      DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
-      GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde);
-      outputStream.sendTo(appDesc.getOutputStream(osd));
-    } else {
-      Table outputTable = appDesc.getTable(tableDescriptor.get());
-      if (outputTable == null) {
-        String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
-        LOG.error(msg);
-        throw new SamzaException(msg);
-      }
-      outputStream.sendTo(outputTable);
-    }
-
     appDesc.withContextManager(new ContextManager() {
       @Override
       public void init(Config config, TaskContext taskContext) {

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 2dc28be..771a5d5 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -27,15 +27,15 @@ import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.task.TaskContext;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 
 
 /**
@@ -53,6 +53,9 @@ class ScanTranslator {
   }
 
   private static class ScanMapFunction implements MapFunction<KV<Object, Object>, SamzaSqlRelMessage> {
+    // All the user-supplied functions are expected to be serializable in order to enable full serialization of user
+    // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator
+    // initialization.
     private transient SamzaRelConverter msgConverter;
     private final String streamName;
 
@@ -83,7 +86,8 @@ class ScanTranslator {
     final String streamName = sqlIOConfig.getStreamName();
 
     KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
-    DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
+    DelegatingSystemDescriptor
+        sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
     GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamName, noOpKVSerde);
     MessageStream<KV<Object, Object>> inputStream = streamAppDesc.getInputStream(isd);
     MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName));

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
index cc339f1..2005c21 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
@@ -42,7 +42,7 @@ public class TestSamzaSqlTable {
 
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
 
-    String sql1 = "Insert into testDb.testTable.`$table` select id, name from testavro.SIMPLE1";
+    String sql1 = "Insert into testDb.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
@@ -58,7 +58,7 @@ public class TestSamzaSqlTable {
     TestIOResolverFactory.TestTable.records.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
 
-    String sql1 = "Insert into testDb.testTable.`$table` select id __key__, name from testavro.SIMPLE1";
+    String sql1 = "Insert into testDb.testTable.`$table`(id,name) select id __key__, name from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index dda0e14..46c0651 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -20,19 +20,25 @@
 package org.apache.samza.sql.runner;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*;
+
 
 public class TestSamzaSqlApplicationConfig {
 
@@ -42,8 +48,14 @@ public class TestSamzaSqlApplicationConfig {
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1");
     String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
     int numUdfs = config.get(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES).split(",").length;
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
-    Assert.assertEquals(1, samzaSqlApplicationConfig.getQueryInfo().size());
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     Assert.assertEquals(numUdfs, samzaSqlApplicationConfig.getUdfMetadata().size());
     Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size());
     Assert.assertEquals(1, samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size());
@@ -54,17 +66,23 @@ public class TestSamzaSqlApplicationConfig {
 
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
 
-
     try {
       // Fail because no SQL config
-      new SamzaSqlApplicationConfig(new MapConfig(config));
+      fetchSqlFromConfig(config);
       Assert.fail();
     } catch (SamzaException e) {
     }
 
     // Pass
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1");
-    new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_IO_RESOLVER);
     testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_UDF_RESOLVER);
 
@@ -85,7 +103,12 @@ public class TestSamzaSqlApplicationConfig {
         "insert into testavro.Profile select * from testavro.SIMPLE1");
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
 
     Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet();
     Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet();
@@ -99,14 +122,24 @@ public class TestSamzaSqlApplicationConfig {
   private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) {
     Map<String, String> badConfigs = new HashMap<>(config);
     badConfigs.remove(configKey);
-    new SamzaSqlApplicationConfig(new MapConfig(badConfigs));
+    List<String> sqlStmts = fetchSqlFromConfig(badConfigs);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    new SamzaSqlApplicationConfig(new MapConfig(badConfigs),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
   }
 
   private void testWithoutConfigShouldFail(Map<String, String> config, String configKey) {
     Map<String, String> badConfigs = new HashMap<>(config);
     badConfigs.remove(configKey);
     try {
-      new SamzaSqlApplicationConfig(new MapConfig(badConfigs));
+      List<String> sqlStmts = fetchSqlFromConfig(badConfigs);
+      List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+      new SamzaSqlApplicationConfig(new MapConfig(badConfigs),
+          queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+              .collect(Collectors.toSet()),
+          queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
       Assert.fail();
     } catch (IllegalArgumentException e) {
       // swallow

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index 9fab5d5..1ac804e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -36,7 +36,7 @@ public class TestSamzaSqlApplicationRunner {
   @Test
   public void testComputeSamzaConfigs() {
     Map<String, String> configs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
-    String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1";
+    String sql1 = "Insert into testavro.outputTopic(id,long_value) select id, MyTest(id) as long_value from testavro.SIMPLE1";
     configs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql1);
     configs.put(SamzaSqlApplicationRunner.RUNNER_CONFIG, SamzaSqlApplicationRunner.class.getName());
     MapConfig samzaConfig = new MapConfig(configs);

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 676781c..458196f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -31,7 +31,6 @@ import java.util.stream.IntStream;
 
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.calcite.avatica.util.ByteString;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.sql.avro.schemas.AddressRecord;
@@ -50,7 +49,6 @@ import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
-import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,6 +71,7 @@ public class TestAvroSystemFactory implements SystemFactory {
   public static final byte[] DEFAULT_TRACKING_ID_BYTES =
       {76, 75, -24, 10, 33, -117, 24, -52, -110, -39, -5, 102, 65, 57, -62, -1};
 
+
   public static List<OutgoingMessageEnvelope> messages = new ArrayList<>();
 
   public static List<String> getPageKeyProfileNameJoin(int numMessages) {

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 14e2243..a96fd08 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -128,6 +128,9 @@ public class SamzaSqlTestConfig {
         "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
 
     staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "simpleOutputTopic"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
         "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
 
     staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
index 1723e0e..a84f347 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.List;
 
+import org.apache.samza.sql.testutil.SqlFileParser;
 import org.junit.Assert;
 import org.junit.Test;