You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/03/22 00:58:16 UTC

[2/3] samza git commit: Add stream-table join support for samza sql

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
index 04fdec5..5309838 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
@@ -1,5 +1,3 @@
-package org.apache.samza.sql;
-
 /*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
@@ -19,27 +17,40 @@ package org.apache.samza.sql;
 * under the License.
 */
 
+package org.apache.samza.sql;
+
+import java.util.HashMap;
 import java.util.Map;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.apache.samza.sql.translator.QueryTranslator;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 
 public class TestQueryTranslator {
 
+  private final Map<String, String> configs = new HashMap<>();
+
+  @Before
+  public void setUp() {
+    configs.put("job.default.system", "kafka");
+  }
+
   @Test
   public void testTranslate() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1");
+        "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
@@ -100,4 +111,357 @@ public class TestQueryTranslator {
     Assert.assertEquals("COMPLEX1",
         streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
   }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithoutJoinOperator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
+            + " where p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithFullJoinOperator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " full join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = IllegalStateException.class)
+  public void testTranslateStreamTableJoinWithSelfJoinOperator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p1.name as profileName"
+            + " from testavro.PROFILE.`$table` as p1"
+            + " join testavro.PROFILE.`$table` as p2"
+            + " on p1.id = p2.id";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithThetaCondition() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id <> pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableCrossJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithAndLiteralCondition() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId and p.name = 'John'";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithSubQuery() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " where exists "
+            + " (select p.id from testavro.PROFILE.`$table` as p"
+            + " where p.id = pv.profileId)";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateTableTableJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW.`$table` as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamStreamJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateJoinWithIncorrectLeftJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW.`$table` as pv"
+            + " left join testavro.PROFILE as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateJoinWithIncorrectRightJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " right join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableInnerJoinWithMissingStream() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String configSourceResolverDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
+    config.put(configSourceResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedSourceResolverFactory.class.getName());
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableInnerJoinWithUdf() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on MyTest(p.id) = MyTest(pv.profileId)";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+  @Test
+  public void testTranslateStreamTableInnerJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+
+    Assert.assertEquals(2, streamGraph.getOutputStreams().size());
+    Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("enrichedPageViewTopic", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+    Assert.assertEquals(3, streamGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("PAGEVIEW",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("PROFILE",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+  }
+
+  @Test
+  public void testTranslateStreamTableLeftJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " left join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+
+    Assert.assertEquals(2, streamGraph.getOutputStreams().size());
+    Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("enrichedPageViewTopic",
+        streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+    Assert.assertEquals(3, streamGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("PAGEVIEW",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("PROFILE",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+  }
+
+  @Test
+  public void testTranslateStreamTableRightJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PROFILE.`$table` as p"
+            + " right join testavro.PAGEVIEW as pv"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+
+    Assert.assertEquals(2, streamGraph.getOutputStreams().size());
+    Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("enrichedPageViewTopic",
+        streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+    Assert.assertEquals(3, streamGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("PROFILE",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("PAGEVIEW",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
index 1c5fc41..0804a6d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
@@ -21,13 +21,13 @@ package org.apache.samza.sql;
 
 import java.util.HashMap;
 import java.util.Map;
-import junit.framework.Assert;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.junit.Assert;
 import org.junit.Test;
 
 
@@ -69,11 +69,11 @@ public class TestSamzaSqlApplicationConfig {
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
     String avroSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", "testavro");
 
-    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER);
+    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER);
 
     // Configs for the unused system "log" is not mandatory.
     String logSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", "log");
-    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER);
+    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER);
   }
 
   private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) {

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
index 97196e2..0bfd721 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
@@ -38,6 +38,21 @@ public class TestSamzaSqlQueryParser {
   }
 
   @Test
+  public void testParseJoinQuery() {
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
+    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getOutputSource());
+    Assert.assertEquals(2, queryInfo.getInputSources().size());
+    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getInputSources().get(0));
+    Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getInputSources().get(1));
+  }
+
+  @Test
   public void testParseInvalidQuery() {
 
     try {
@@ -58,13 +73,4 @@ public class TestSamzaSqlQueryParser {
     } catch (SamzaException e) {
     }
   }
-
-  @Test
-  public void testParseJoin() {
-    try {
-      SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar1,tracking.bar2");
-      Assert.fail("Expected a samzaException");
-    } catch (SamzaException e) {
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
new file mode 100644
index 0000000..3da004a
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
@@ -0,0 +1,116 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.translator.SamzaSqlRelMessageJoinFunction;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRelMessageJoinFunction {
+
+  private List<String> streamFieldNames = Arrays.asList("field1", "field2", "field3", "field4");
+  private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, "value4");
+  private List<String> tableFieldNames = Arrays.asList("field11", "field12", "field13", "field14");
+  private List<Object> tableFieldValues = Arrays.asList("value1", 1, null, "value5");
+
+  @Test
+  public void testWithInnerJoinWithTableOnRight() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
+    JoinRelType joinRelType = JoinRelType.INNER;
+    List<Integer> streamKeyIds = Arrays.asList(0, 1);
+    List<Integer> tableKeyIds = Arrays.asList(0, 1);
+    SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+    KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+    Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size());
+    List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+    expectedFieldNames.addAll(tableFieldNames);
+    List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+    expectedFieldValues.addAll(tableFieldValues);
+    Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues);
+  }
+
+  @Test
+  public void testWithInnerJoinWithTableOnLeft() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
+    JoinRelType joinRelType = JoinRelType.INNER;
+    List<Integer> streamKeyIds = Arrays.asList(0, 2);
+    List<Integer> tableKeyIds = Arrays.asList(0, 2);
+    SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+    KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+    Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size());
+    List<String> expectedFieldNames = new ArrayList<>(tableFieldNames);
+    expectedFieldNames.addAll(streamFieldNames);
+    List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues);
+    expectedFieldValues.addAll(streamFieldValues);
+    Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues);
+  }
+
+  @Test
+  public void testNullRecordWithInnerJoin() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    JoinRelType joinRelType = JoinRelType.INNER;
+    List<Integer> streamKeyIds = Arrays.asList(0, 1);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
+    Assert.assertNull(outMsg);
+  }
+
+  @Test
+  public void testNullRecordWithLeftOuterJoin() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    JoinRelType joinRelType = JoinRelType.LEFT;
+    List<Integer> streamKeyIds = Arrays.asList(0, 1);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames,
+            tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
+
+    Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size());
+    List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+    expectedFieldNames.addAll(tableFieldNames);
+    List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+    expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null ).collect(Collectors.toList()));
+    Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
new file mode 100644
index 0000000..3416ee1
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
@@ -0,0 +1,43 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRelMessageSerde {
+
+  private List<Object> values = Arrays.asList("value1", 1, null);
+  private List<String> names = Arrays.asList("field1", "field2", "field3");
+
+  @Test
+  public void testWithDifferentFields() {
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+    JsonSerdeV2<SamzaSqlRelMessage> serde = new JsonSerdeV2<>(SamzaSqlRelMessage.class);
+    SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(message));
+    Assert.assertEquals(resultMsg.getFieldNames(), names);
+    Assert.assertEquals(resultMsg.getFieldValues(), values);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc
new file mode 100644
index 0000000..a235c28
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "Company",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "id",
+            "doc": "Company id.",
+            "type": ["null", "int"],
+            "default":null
+        },
+        {
+            "name": "name",
+            "doc" : "Company name.",
+            "type": ["null", "string"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java
new file mode 100644
index 0000000..9513acf
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java
@@ -0,0 +1,52 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class Company extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Company\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Company id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Company name.\",\"default\":null}]}");
+  /** Company id. */
+  public java.lang.Integer id;
+  /** Company name. */
+  public java.lang.CharSequence name;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return id;
+    case 1: return name;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: id = (java.lang.Integer)value$; break;
+    case 1: name = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
new file mode 100644
index 0000000..5117a5e
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "EnrichedPageView",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "pageKey",
+            "doc": "Page key.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "companyName",
+            "doc" : "Company name.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "profileName",
+            "doc" : "Profile name.",
+            "type": ["null", "string"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
new file mode 100644
index 0000000..cf3f62d
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class EnrichedPageView extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"EnrichedPageView\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page key.\",\"default\":null},{\"name\":\"companyName\",\"type\":[\"null\",\"string\"],\"doc\":\"Company name.\",\"default\":null},{\"name\":\"profileName\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile name.\",\"default\":null}]}");
+  /** Page key. */
+  public java.lang.CharSequence pageKey;
+  /** Company name. */
+  public java.lang.CharSequence companyName;
+  /** Profile name. */
+  public java.lang.CharSequence profileName;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return pageKey;
+    case 1: return companyName;
+    case 2: return profileName;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: pageKey = (java.lang.CharSequence)value$; break;
+    case 1: companyName = (java.lang.CharSequence)value$; break;
+    case 2: profileName = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc
new file mode 100644
index 0000000..5c107ab
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "PageView",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "pageKey",
+            "doc": "Page key.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "profileId",
+            "doc" : "Profile id.",
+            "type": ["null", "int"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java
new file mode 100644
index 0000000..21e7bb7
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java
@@ -0,0 +1,52 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class PageView extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"PageView\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page key.\",\"default\":null},{\"name\":\"profileId\",\"type\":[\"null\",\"int\"],\"doc\":\"Profile id.\",\"default\":null}]}");
+  /** Page key. */
+  public java.lang.CharSequence pageKey;
+  /** Profile id. */
+  public java.lang.Integer profileId;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return pageKey;
+    case 1: return profileId;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: pageKey = (java.lang.CharSequence)value$; break;
+    case 1: profileId = (java.lang.Integer)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc
new file mode 100644
index 0000000..4e5e7dc
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "Profile",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "id",
+            "doc": "Profile id.",
+            "type": ["null", "int"],
+            "default":null
+        },
+        {
+            "name": "name",
+            "doc" : "Profile name.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "companyId",
+            "doc" : "Company id.",
+            "type": ["null", "int"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java
new file mode 100644
index 0000000..b5c1828
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class Profile extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Profile\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Profile id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile name.\",\"default\":null},{\"name\":\"companyId\",\"type\":[\"null\",\"int\"],\"doc\":\"Company id.\",\"default\":null}]}");
+  /** Profile id. */
+  public java.lang.Integer id;
+  /** Profile name. */
+  public java.lang.CharSequence name;
+  /** Company id. */
+  public java.lang.Integer companyId;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return id;
+    case 1: return name;
+    case 2: return companyId;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: id = (java.lang.Integer)value$; break;
+    case 1: name = (java.lang.CharSequence)value$; break;
+    case 2: companyId = (java.lang.Integer)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java
deleted file mode 100644
index 8baa9e7..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.e2e;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.MyTestUdf;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class TestSamzaSqlEndToEnd {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class);
-
-  @Test
-  public void testEndToEnd() throws Exception {
-    int numMessages = 20;
-
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id, CURRENT_TIME as long_value from testavro.SIMPLE1";
-    List<String> sqlStmts = Arrays.asList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
-
-    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
-        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
-        .sorted()
-        .collect(Collectors.toList());
-    Assert.assertEquals(numMessages, outMessages.size());
-    Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
-  }
-
-  @Test
-  public void testEndToEndFlatten() throws Exception {
-    int numMessages = 20;
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
-    String sql1 =
-        "Insert into testavro.outputTopic select Flatten(array_values) as string_value, id from testavro.COMPLEX1";
-    List<String> sqlStmts = Collections.singletonList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
-
-    List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
-
-    int expectedMessages = 0;
-    // Flatten de-normalizes the data. So there is separate record for each entry in the array.
-    for (int index = 1; index < numMessages; index++) {
-      expectedMessages = expectedMessages + Math.max(1, index);
-    }
-    Assert.assertEquals(expectedMessages, outMessages.size());
-  }
-
-  @Test
-  public void testEndToEndSubQuery() throws Exception {
-    int numMessages = 20;
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 =
-        "Insert into testavro.outputTopic select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)";
-    List<String> sqlStmts = Collections.singletonList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
-
-    List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
-
-    int expectedMessages = 0;
-    // Flatten de-normalizes the data. So there is separate record for each entry in the array.
-    for (int index = 1; index < numMessages; index++) {
-      expectedMessages = expectedMessages + Math.max(1, index);
-    }
-    Assert.assertEquals(expectedMessages, outMessages.size());
-  }
-
-  @Test
-  public void testEndToEndUdf() throws Exception {
-    int numMessages = 20;
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1";
-    List<String> sqlStmts = Collections.singletonList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
-
-    LOG.info("output Messages " + TestAvroSystemFactory.messages);
-
-    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
-        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("long_value").toString()))
-        .sorted()
-        .collect(Collectors.toList());
-    Assert.assertEquals(outMessages.size(), numMessages);
-    MyTestUdf udf = new MyTestUdf();
-
-    Assert.assertTrue(
-        IntStream.range(0, numMessages).map(udf::execute).boxed().collect(Collectors.toList()).equals(outMessages));
-  }
-
-  @Test
-  public void testRegexMatchUdfInWhereClause() throws Exception {
-    int numMessages = 20;
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id from testavro.SIMPLE1 where RegexMatch('.*4', Name)";
-    List<String> sqlStmts = Collections.singletonList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
-
-    LOG.info("output Messages " + TestAvroSystemFactory.messages);
-    // There should be two messages that contain "4"
-    Assert.assertEquals(TestAvroSystemFactory.messages.size(), 2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java b/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
index 5655a81..a8731fb 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
@@ -26,10 +26,13 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 
+import static org.apache.samza.system.IncomingMessageEnvelope.*;
+
 
 public class SimpleSystemAdmin implements SystemAdmin {
 
@@ -46,7 +49,7 @@ public class SimpleSystemAdmin implements SystemAdmin {
     return streamNames.stream()
         .collect(Collectors.toMap(Function.identity(), streamName -> new SystemStreamMetadata(streamName,
             Collections.singletonMap(new Partition(0),
-                new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null)))));
+                new SystemStreamMetadata.SystemStreamPartitionMetadata(null, END_OF_STREAM_OFFSET, null)))));
   }
 
   @Override
@@ -58,4 +61,10 @@ public class SimpleSystemAdmin implements SystemAdmin {
     }
     return offset1.compareTo(offset2);
   }
+
+  @Override
+  public boolean createStream(StreamSpec streamSpec) {
+    // Do nothing.
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 3a9ae16..6dcad25 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -21,6 +21,7 @@ package org.apache.samza.sql.system;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,7 +32,10 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.sql.avro.schemas.Company;
 import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.PageView;
+import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -46,9 +50,42 @@ import org.slf4j.LoggerFactory;
 
 public class TestAvroSystemFactory implements SystemFactory {
   private static final Logger LOG = LoggerFactory.getLogger(TestAvroSystemFactory.class);
+
   public static final String CFG_NUM_MESSAGES = "numMessages";
+  public static final String CFG_INCLUDE_NULL_FOREIGN_KEYS = "includeNullForeignKeys";
   public static List<OutgoingMessageEnvelope> messages = new ArrayList<>();
 
+  public static final String[] profiles = {"John", "Mike", "Mary", "Joe", "Brad", "Jennifer"};
+  public static final String[] companies = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"};
+  public static final String[] pagekeys = {"inbox", "home", "search", "pymk", "group", "job"};
+
+  public static List<String> getPageKeyProfileNameJoin(int numMessages) {
+    return IntStream.range(0, numMessages)
+                .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profiles[i % profiles.length])
+                .collect(Collectors.toList());
+  }
+
+  public static List<String> getPageKeyProfileNameJoinWithNullForeignKeys(int numMessages) {
+    // All even profileId foreign keys are null
+    return IntStream.range(0, numMessages / 2)
+        .mapToObj(i -> pagekeys[(i * 2 + 1) % pagekeys.length] + "," + profiles[(i * 2 + 1) % profiles.length])
+        .collect(Collectors.toList());
+  }
+
+  public static List<String> getPageKeyProfileNameOuterJoinWithNullForeignKeys(int numMessages) {
+    // All even profileId foreign keys are null
+    return IntStream.range(0, numMessages)
+        .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + ((i % 2 == 0) ? "null" : profiles[i % profiles.length]))
+        .collect(Collectors.toList());
+  }
+
+  public static List<String> getPageKeyProfileCompanyNameJoin(int numMessages) {
+    return IntStream.range(0, numMessages)
+        .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profiles[i % profiles.length] +
+            "," + companies[i % companies.length])
+        .collect(Collectors.toList());
+  }
+
   @Override
   public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
     return new TestAvroSystemConsumer(systemName, config);
@@ -67,10 +104,16 @@ public class TestAvroSystemFactory implements SystemFactory {
   private class TestAvroSystemConsumer implements SystemConsumer {
     public static final int DEFAULT_NUM_EVENTS = 10;
     private final int numMessages;
-    private boolean simpleRecord;
+    private final boolean includeNullForeignKeys;
+    private final Set<SystemStreamPartition> simpleRecordMap = new HashSet<>();
+    private final Set<SystemStreamPartition> profileRecordMap = new HashSet<>();
+    private final Set<SystemStreamPartition> companyRecordMap = new HashSet<>();
+    private final Set<SystemStreamPartition> pageViewRecordMap = new HashSet<>();
 
     public TestAvroSystemConsumer(String systemName, Config config) {
       numMessages = config.getInt(String.format("systems.%s.%s", systemName, CFG_NUM_MESSAGES), DEFAULT_NUM_EVENTS);
+      includeNullForeignKeys = config.getBoolean(String.format("systems.%s.%s", systemName,
+          CFG_INCLUDE_NULL_FOREIGN_KEYS), false);
     }
 
     @Override
@@ -83,7 +126,18 @@ public class TestAvroSystemFactory implements SystemFactory {
 
     @Override
     public void register(SystemStreamPartition systemStreamPartition, String offset) {
-      simpleRecord = systemStreamPartition.getStream().toLowerCase().contains("simple");
+      if (systemStreamPartition.getStream().toLowerCase().contains("simple1")) {
+        simpleRecordMap.add(systemStreamPartition);
+      }
+      if (systemStreamPartition.getStream().toLowerCase().contains("profile")) {
+        profileRecordMap.add(systemStreamPartition);
+      }
+      if (systemStreamPartition.getStream().toLowerCase().contains("company")) {
+        companyRecordMap.add(systemStreamPartition);
+      }
+      if (systemStreamPartition.getStream().toLowerCase().contains("pageview")) {
+        pageViewRecordMap.add(systemStreamPartition);
+      }
     }
 
     @Override
@@ -93,8 +147,8 @@ public class TestAvroSystemFactory implements SystemFactory {
       set.forEach(ssp -> {
         // We send num Messages and an end of stream message following that.
         List<IncomingMessageEnvelope> envelopes = IntStream.range(0, numMessages + 1)
-            .mapToObj(i -> new IncomingMessageEnvelope(ssp,
-                i == numMessages ? IncomingMessageEnvelope.END_OF_STREAM_OFFSET : null, "key" + i, getData(i)))
+            .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, "key" + i,
+                getData(i, ssp)) : IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp))
             .collect(Collectors.toList());
         envelopeMap.put(ssp, envelopes);
       });
@@ -102,9 +156,15 @@ public class TestAvroSystemFactory implements SystemFactory {
       return envelopeMap;
     }
 
-    private Object getData(int index) {
-      if (simpleRecord) {
+    private Object getData(int index, SystemStreamPartition ssp) {
+      if (simpleRecordMap.contains(ssp)) {
         return createSimpleRecord(index);
+      } else if (profileRecordMap.contains(ssp)) {
+        return createProfileRecord(index);
+      } else if (companyRecordMap.contains(ssp)) {
+        return createCompanyRecord(index);
+      } else if (pageViewRecordMap.contains(ssp)) {
+        return createPageViewRecord(index);
       } else {
         return createComplexRecord(index);
       }
@@ -117,6 +177,29 @@ public class TestAvroSystemFactory implements SystemFactory {
       return record;
     }
 
+    private Object createProfileRecord(int index) {
+      GenericRecord record = new GenericData.Record(Profile.SCHEMA$);
+      record.put("id", index);
+      record.put("name", profiles[index % profiles.length]);
+      record.put("companyId", includeNullForeignKeys && (index % 2 == 0) ? null : index % companies.length);
+      return record;
+    }
+
+    private Object createCompanyRecord(int index) {
+      GenericRecord record = new GenericData.Record(Company.SCHEMA$);
+      record.put("id", index);
+      record.put("name", companies[index % companies.length]);
+      return record;
+    }
+
+    private Object createPageViewRecord(int index) {
+      GenericRecord record = new GenericData.Record(PageView.SCHEMA$);
+      // All even profileId foreign keys are null
+      record.put("profileId", includeNullForeignKeys && (index % 2 == 0) ? null : index);
+      record.put("pageKey", pagekeys[index % pagekeys.length]);
+      return record;
+    }
+
     private Object createComplexRecord(int index) {
       GenericRecord record = new GenericData.Record(ComplexRecord.SCHEMA$);
       record.put("id", index);

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 92766f6..b8b2814 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -28,15 +28,20 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.sql.avro.AvroRelConverterFactory;
 import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.avro.schemas.Company;
 import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.EnrichedPageView;
+import org.apache.samza.sql.avro.schemas.PageView;
+import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.sql.fn.FlattenUdf;
 import org.apache.samza.sql.fn.RegexMatchUdf;
 import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
+import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 
 
@@ -48,11 +53,21 @@ public class SamzaSqlTestConfig {
   public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
 
   public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
+    return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
+  }
+
+  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages) {
+    return fetchStaticConfigsWithFactories(props, numberOfMessages, false);
+  }
+
+  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
+      boolean includeNullForeignKeys) {
     HashMap<String, String> staticConfigs = new HashMap<>();
 
     staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
     staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
     staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
     staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER, "config");
@@ -75,8 +90,10 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(avroSystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
     staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
         String.valueOf(numberOfMessages));
-    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER, "avro");
-    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
+        includeNullForeignKeys ? "true" : "false");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config");
 
     String avroSamzaToRelMsgConverterDomain =
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
@@ -88,17 +105,31 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
         ConfigBasedAvroRelSchemaProviderFactory.class.getName());
 
-    staticConfigs.put(
-        configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-            "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "Profile"), ComplexRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "PROFILE"), Profile.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "PAGEVIEW"), PageView.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "COMPANY"), Company.SCHEMA$.toString());
 
-    staticConfigs.put(
-        configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-            "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "enrichedPageViewTopic"), EnrichedPageView.SCHEMA$.toString());
 
-    staticConfigs.put(
-        configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-            "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString());
+    staticConfigs.putAll(props);
 
     return staticConfigs;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
index b9cf803..4f1d08e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import org.apache.samza.config.Config;
 import org.apache.samza.sql.interfaces.SourceResolver;
 import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 
 
 public class TestSourceResolverFactory implements SourceResolverFactory {
@@ -33,6 +33,7 @@ public class TestSourceResolverFactory implements SourceResolverFactory {
   }
 
   private class TestSourceResolver implements SourceResolver {
+    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
     private final Config config;
 
     public TestSourceResolver(Config config) {
@@ -40,11 +41,26 @@ public class TestSourceResolverFactory implements SourceResolverFactory {
     }
 
     @Override
-    public SqlSystemStreamConfig fetchSourceInfo(String sourceName) {
+    public SqlSystemSourceConfig fetchSourceInfo(String sourceName) {
       String[] sourceComponents = sourceName.split("\\.");
-      Config systemConfigs = config.subset(sourceComponents[0] + ".");
-      return new SqlSystemStreamConfig(sourceComponents[0], sourceComponents[sourceComponents.length - 1],
-          Arrays.asList(sourceComponents), systemConfigs);
+      boolean isTable = false;
+      int systemIdx = 0;
+      int endIdx = sourceComponents.length - 1;
+      int streamIdx = endIdx;
+
+      if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+        isTable = true;
+        streamIdx = endIdx - 1;
+      }
+      Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
+      return new SqlSystemSourceConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
+          Arrays.asList(sourceComponents), systemConfigs, isTable);
+    }
+
+    @Override
+    public boolean isTable(String sourceName) {
+      String[] sourceComponents = sourceName.split("\\.");
+      return sourceComponents[sourceComponents.length - 1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/resources/log4j.xml b/samza-sql/src/test/resources/log4j.xml
index 6259b48..9d29506 100644
--- a/samza-sql/src/test/resources/log4j.xml
+++ b/samza-sql/src/test/resources/log4j.xml
@@ -28,6 +28,12 @@
 
   @log4j.loggers.public_access@
   <logger name="org.apache" additivity="false">
+    <level value="INFO"/>
+    <appender-ref ref="console"/>
+  </logger>
+
+  @log4j.loggers.public_access@
+  <logger name="org.apache.calcite.sql2rel" additivity="false">
     <level value="DEBUG"/>
     <appender-ref ref="console"/>
   </logger>