You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/03/22 00:58:15 UTC

[1/3] samza git commit: Add stream-table join support for samza sql

Repository: samza
Updated Branches:
  refs/heads/master 2d7b0f52c -> 956cf412a


http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
new file mode 100644
index 0000000..3fff0f3
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -0,0 +1,385 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.test.samzasql;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.serializers.JsonSerdeV2Factory;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.system.TestAvroSystemFactory;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.MyTestUdf;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class);
+  private final Map<String, String> configs = new HashMap<>();
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
+    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
+    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
+    configs.put("systems.kafka.samza.key.serde", "object");
+    configs.put("systems.kafka.samza.msg.serde", "samzaSqlRelMsg");
+    configs.put("systems.kafka.default.stream.replication.factor", "1");
+    configs.put("job.default.system", "kafka");
+
+    configs.put("serializers.registry.object.class", JsonSerdeV2Factory.class.getName());
+    configs.put("serializers.registry.samzaSqlRelMsg.class", JsonSerdeV2Factory.class.getName());
+  }
+
+  @Test
+  public void testEndToEnd() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 = "Insert into testavro.outputTopic select id, CURRENT_TIME as long_value from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
+  }
+
+  @Test
+  public void testEndToEndFlatten() throws Exception {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
+    String sql1 =
+        "Insert into testavro.outputTopic select Flatten(array_values) as string_value, id from testavro.COMPLEX1";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
+
+    int expectedMessages = 0;
+    // Flatten de-normalizes the data. So there is separate record for each entry in the array.
+    for (int index = 1; index < numMessages; index++) {
+      expectedMessages = expectedMessages + Math.max(1, index);
+    }
+    Assert.assertEquals(expectedMessages, outMessages.size());
+  }
+
+  @Test
+  public void testEndToEndSubQuery() throws Exception {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 =
+        "Insert into testavro.outputTopic select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
+
+    int expectedMessages = 0;
+    // Flatten de-normalizes the data. So there is separate record for each entry in the array.
+    for (int index = 1; index < numMessages; index++) {
+      expectedMessages = expectedMessages + Math.max(1, index);
+    }
+    Assert.assertEquals(expectedMessages, outMessages.size());
+  }
+
+  @Test
+  public void testEndToEndUdf() throws Exception {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    LOG.info("output Messages " + TestAvroSystemFactory.messages);
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("long_value").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(outMessages.size(), numMessages);
+    MyTestUdf udf = new MyTestUdf();
+
+    Assert.assertTrue(
+        IntStream.range(0, numMessages).map(udf::execute).boxed().collect(Collectors.toList()).equals(outMessages));
+  }
+
+  @Test
+  public void testRegexMatchUdfInWhereClause() throws Exception {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 = "Insert into testavro.outputTopic select id from testavro.SIMPLE1 where RegexMatch('.*4', Name)";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    LOG.info("output Messages " + TestAvroSystemFactory.messages);
+    // There should be two messages that contain "4"
+    Assert.assertEquals(TestAvroSystemFactory.messages.size(), 2);
+  }
+
+  @Test
+  public void testEndToEndStreamTableInnerJoin() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    staticConfigs.putAll(configs);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey, p.name as profileName "
+            + "from testavro.PROFILE.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.id = pv.profileId";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
+    Assert.assertEquals(outMessages, expectedOutMessages);
+  }
+
+  @Test
+  public void testEndToEndStreamTableInnerJoinWithFilter() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    staticConfigs.putAll(configs);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey, p.name as profileName "
+            + "from testavro.PROFILE.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.id = pv.profileId "
+            + "where p.name = 'Mike'";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(4, outMessages.size());
+    List<String> expectedOutMessages =
+        TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages)
+            .stream()
+            .filter(msg -> msg.endsWith("Mike"))
+            .collect(Collectors.toList());
+    Assert.assertEquals(outMessages, expectedOutMessages);
+  }
+
+  @Test
+  public void testEndToEndStreamTableInnerJoinWithNullForeignKeys() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey, p.name as profileName "
+            + "from testavro.PAGEVIEW as pv "
+            + "join testavro.PROFILE.`$table` as p "
+            + " on pv.profileId = p.id";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    // Half the foreign keys are null.
+    Assert.assertEquals(numMessages / 2, outMessages.size());
+    List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages);
+    Assert.assertEquals(outMessages, expectedOutMessages);
+  }
+
+  @Test
+  public void testEndToEndStreamTableLeftJoin() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey, p.name as profileName "
+            + "from testavro.PAGEVIEW as pv "
+            + "left join testavro.PROFILE.`$table` as p "
+            + " on pv.profileId = p.id";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages =
+        TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages);
+    Assert.assertEquals(outMessages, expectedOutMessages);
+  }
+
+  @Test
+  public void testEndToEndStreamTableRightJoin() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey, p.name as profileName "
+            + "from testavro.PROFILE.`$table` as p "
+            + "right join testavro.PAGEVIEW as pv "
+            + " on p.id = pv.profileId";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages =
+        TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages);
+    Assert.assertEquals(outMessages, expectedOutMessages);
+  }
+
+  @Test
+  public void testEndToEndStreamTableTableJoin() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey, p.name as profileName, c.name as companyName "
+            + "from testavro.PAGEVIEW as pv "
+            + "join testavro.PROFILE.`$table` as p "
+            + " on p.id = pv.profileId "
+            + " join testavro.COMPANY.`$table` as c "
+            + " on p.companyId = c.id";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + ((GenericRecord) x.getMessage()).get("profileName").toString() + ","
+            + ((GenericRecord) x.getMessage()).get("companyName").toString())
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(numMessages);
+    Assert.assertEquals(outMessages, expectedOutMessages);
+  }
+
+  @Test
+  public void testEndToEndStreamTableTableJoinWithCompositeKey() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey, p.name as profileName, c.name as companyName "
+            + "from testavro.PAGEVIEW as pv "
+            + "join testavro.PROFILE.`$table` as p "
+            + " on p.id = pv.profileId "
+            + " join testavro.COMPANY.`$table` as c "
+            + " on p.companyId = c.id AND c.id = pv.profileId";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + ((GenericRecord) x.getMessage()).get("profileName").toString() + ","
+            + ((GenericRecord) x.getMessage()).get("companyName").toString())
+        .collect(Collectors.toList());
+    Assert.assertEquals(TestAvroSystemFactory.companies.length, outMessages.size());
+    List<String> expectedOutMessages =
+        TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(TestAvroSystemFactory.companies.length);
+    Assert.assertEquals(outMessages, expectedOutMessages);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
index 11a49f7..251ea16 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
@@ -40,7 +40,7 @@ import org.apache.samza.sql.fn.FlattenUdf;
 import org.apache.samza.sql.fn.RegexMatchUdf;
 import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.testutil.JsonUtil;
@@ -148,15 +148,15 @@ public class SamzaSqlConsole {
     staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.reset", "true");
     staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.default", "oldest");
 
-    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER, "avro");
-    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config");
 
     String logSystemConfigPrefix =
         String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_LOG);
     String logSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", SAMZA_SYSTEM_LOG);
     staticConfigs.put(logSystemConfigPrefix + "samza.factory", ConsoleLoggingSystemFactory.class.getName());
-    staticConfigs.put(logSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER, "json");
-    staticConfigs.put(logSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+    staticConfigs.put(logSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "json");
+    staticConfigs.put(logSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config");
 
     String avroSamzaToRelMsgConverterDomain =
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");


[2/3] samza git commit: Add stream-table join support for samza sql

Posted by xi...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
index 04fdec5..5309838 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
@@ -1,5 +1,3 @@
-package org.apache.samza.sql;
-
 /*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
@@ -19,27 +17,40 @@ package org.apache.samza.sql;
 * under the License.
 */
 
+package org.apache.samza.sql;
+
+import java.util.HashMap;
 import java.util.Map;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.apache.samza.sql.translator.QueryTranslator;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 
 public class TestQueryTranslator {
 
+  private final Map<String, String> configs = new HashMap<>();
+
+  @Before
+  public void setUp() {
+    configs.put("job.default.system", "kafka");
+  }
+
   @Test
   public void testTranslate() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1");
+        "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
@@ -100,4 +111,357 @@ public class TestQueryTranslator {
     Assert.assertEquals("COMPLEX1",
         streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
   }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithoutJoinOperator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
+            + " where p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithFullJoinOperator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " full join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = IllegalStateException.class)
+  public void testTranslateStreamTableJoinWithSelfJoinOperator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p1.name as profileName"
+            + " from testavro.PROFILE.`$table` as p1"
+            + " join testavro.PROFILE.`$table` as p2"
+            + " on p1.id = p2.id";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithThetaCondition() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id <> pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableCrossJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithAndLiteralCondition() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId and p.name = 'John'";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithSubQuery() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " where exists "
+            + " (select p.id from testavro.PROFILE.`$table` as p"
+            + " where p.id = pv.profileId)";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateTableTableJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW.`$table` as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamStreamJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateJoinWithIncorrectLeftJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW.`$table` as pv"
+            + " left join testavro.PROFILE as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateJoinWithIncorrectRightJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " right join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableInnerJoinWithMissingStream() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String configSourceResolverDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
+    config.put(configSourceResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedSourceResolverFactory.class.getName());
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableInnerJoinWithUdf() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on MyTest(p.id) = MyTest(pv.profileId)";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+  }
+  @Test
+  public void testTranslateStreamTableInnerJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+
+    Assert.assertEquals(2, streamGraph.getOutputStreams().size());
+    Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("enrichedPageViewTopic", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+    Assert.assertEquals(3, streamGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("PAGEVIEW",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("PROFILE",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+  }
+
+  @Test
+  public void testTranslateStreamTableLeftJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " left join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+
+    Assert.assertEquals(2, streamGraph.getOutputStreams().size());
+    Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("enrichedPageViewTopic",
+        streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+    Assert.assertEquals(3, streamGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("PAGEVIEW",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("PROFILE",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+  }
+
+  @Test
+  public void testTranslateStreamTableRightJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PROFILE.`$table` as p"
+            + " right join testavro.PAGEVIEW as pv"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+
+    Assert.assertEquals(2, streamGraph.getOutputStreams().size());
+    Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("enrichedPageViewTopic",
+        streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+    Assert.assertEquals(3, streamGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("PROFILE",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("PAGEVIEW",
+        streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
index 1c5fc41..0804a6d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
@@ -21,13 +21,13 @@ package org.apache.samza.sql;
 
 import java.util.HashMap;
 import java.util.Map;
-import junit.framework.Assert;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.junit.Assert;
 import org.junit.Test;
 
 
@@ -69,11 +69,11 @@ public class TestSamzaSqlApplicationConfig {
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
     String avroSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", "testavro");
 
-    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER);
+    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER);
 
     // Configs for the unused system "log" is not mandatory.
     String logSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", "log");
-    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER);
+    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER);
   }
 
   private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) {

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
index 97196e2..0bfd721 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
@@ -38,6 +38,21 @@ public class TestSamzaSqlQueryParser {
   }
 
   @Test
+  public void testParseJoinQuery() {
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
+    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getOutputSource());
+    Assert.assertEquals(2, queryInfo.getInputSources().size());
+    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getInputSources().get(0));
+    Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getInputSources().get(1));
+  }
+
+  @Test
   public void testParseInvalidQuery() {
 
     try {
@@ -58,13 +73,4 @@ public class TestSamzaSqlQueryParser {
     } catch (SamzaException e) {
     }
   }
-
-  @Test
-  public void testParseJoin() {
-    try {
-      SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar1,tracking.bar2");
-      Assert.fail("Expected a samzaException");
-    } catch (SamzaException e) {
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
new file mode 100644
index 0000000..3da004a
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
@@ -0,0 +1,116 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.translator.SamzaSqlRelMessageJoinFunction;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRelMessageJoinFunction {
+
+  private List<String> streamFieldNames = Arrays.asList("field1", "field2", "field3", "field4");
+  private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, "value4");
+  private List<String> tableFieldNames = Arrays.asList("field11", "field12", "field13", "field14");
+  private List<Object> tableFieldValues = Arrays.asList("value1", 1, null, "value5");
+
+  @Test
+  public void testWithInnerJoinWithTableOnRight() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
+    JoinRelType joinRelType = JoinRelType.INNER;
+    List<Integer> streamKeyIds = Arrays.asList(0, 1);
+    List<Integer> tableKeyIds = Arrays.asList(0, 1);
+    SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+    KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+    Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size());
+    List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+    expectedFieldNames.addAll(tableFieldNames);
+    List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+    expectedFieldValues.addAll(tableFieldValues);
+    Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues);
+  }
+
+  @Test
+  public void testWithInnerJoinWithTableOnLeft() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
+    JoinRelType joinRelType = JoinRelType.INNER;
+    List<Integer> streamKeyIds = Arrays.asList(0, 2);
+    List<Integer> tableKeyIds = Arrays.asList(0, 2);
+    SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+    KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+    Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size());
+    List<String> expectedFieldNames = new ArrayList<>(tableFieldNames);
+    expectedFieldNames.addAll(streamFieldNames);
+    List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues);
+    expectedFieldValues.addAll(streamFieldValues);
+    Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues);
+  }
+
+  @Test
+  public void testNullRecordWithInnerJoin() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    JoinRelType joinRelType = JoinRelType.INNER;
+    List<Integer> streamKeyIds = Arrays.asList(0, 1);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
+    Assert.assertNull(outMsg);
+  }
+
+  @Test
+  public void testNullRecordWithLeftOuterJoin() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    JoinRelType joinRelType = JoinRelType.LEFT;
+    List<Integer> streamKeyIds = Arrays.asList(0, 1);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames,
+            tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
+
+    Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size());
+    List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+    expectedFieldNames.addAll(tableFieldNames);
+    List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+    expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null ).collect(Collectors.toList()));
+    Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
new file mode 100644
index 0000000..3416ee1
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
@@ -0,0 +1,43 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRelMessageSerde {
+
+  private List<Object> values = Arrays.asList("value1", 1, null);
+  private List<String> names = Arrays.asList("field1", "field2", "field3");
+
+  @Test
+  public void testWithDifferentFields() {
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+    JsonSerdeV2<SamzaSqlRelMessage> serde = new JsonSerdeV2<>(SamzaSqlRelMessage.class);
+    SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(message));
+    Assert.assertEquals(resultMsg.getFieldNames(), names);
+    Assert.assertEquals(resultMsg.getFieldValues(), values);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc
new file mode 100644
index 0000000..a235c28
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "Company",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "id",
+            "doc": "Company id.",
+            "type": ["null", "int"],
+            "default":null
+        },
+        {
+            "name": "name",
+            "doc" : "Company name.",
+            "type": ["null", "string"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java
new file mode 100644
index 0000000..9513acf
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java
@@ -0,0 +1,52 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class Company extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Company\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Company id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Company name.\",\"default\":null}]}");
+  /** Company id. */
+  public java.lang.Integer id;
+  /** Company name. */
+  public java.lang.CharSequence name;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return id;
+    case 1: return name;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: id = (java.lang.Integer)value$; break;
+    case 1: name = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
new file mode 100644
index 0000000..5117a5e
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "EnrichedPageView",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "pageKey",
+            "doc": "Page key.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "companyName",
+            "doc" : "Company name.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "profileName",
+            "doc" : "Profile name.",
+            "type": ["null", "string"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
new file mode 100644
index 0000000..cf3f62d
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class EnrichedPageView extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"EnrichedPageView\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page key.\",\"default\":null},{\"name\":\"companyName\",\"type\":[\"null\",\"string\"],\"doc\":\"Company name.\",\"default\":null},{\"name\":\"profileName\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile name.\",\"default\":null}]}");
+  /** Page key. */
+  public java.lang.CharSequence pageKey;
+  /** Company name. */
+  public java.lang.CharSequence companyName;
+  /** Profile name. */
+  public java.lang.CharSequence profileName;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return pageKey;
+    case 1: return companyName;
+    case 2: return profileName;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: pageKey = (java.lang.CharSequence)value$; break;
+    case 1: companyName = (java.lang.CharSequence)value$; break;
+    case 2: profileName = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc
new file mode 100644
index 0000000..5c107ab
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "PageView",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "pageKey",
+            "doc": "Page key.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "profileId",
+            "doc" : "Profile id.",
+            "type": ["null", "int"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java
new file mode 100644
index 0000000..21e7bb7
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java
@@ -0,0 +1,52 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class PageView extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"PageView\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page key.\",\"default\":null},{\"name\":\"profileId\",\"type\":[\"null\",\"int\"],\"doc\":\"Profile id.\",\"default\":null}]}");
+  /** Page key. */
+  public java.lang.CharSequence pageKey;
+  /** Profile id. */
+  public java.lang.Integer profileId;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return pageKey;
+    case 1: return profileId;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: pageKey = (java.lang.CharSequence)value$; break;
+    case 1: profileId = (java.lang.Integer)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc
new file mode 100644
index 0000000..4e5e7dc
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "Profile",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "id",
+            "doc": "Profile id.",
+            "type": ["null", "int"],
+            "default":null
+        },
+        {
+            "name": "name",
+            "doc" : "Profile name.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "companyId",
+            "doc" : "Company id.",
+            "type": ["null", "int"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java
new file mode 100644
index 0000000..b5c1828
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class Profile extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Profile\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Profile id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile name.\",\"default\":null},{\"name\":\"companyId\",\"type\":[\"null\",\"int\"],\"doc\":\"Company id.\",\"default\":null}]}");
+  /** Profile id. */
+  public java.lang.Integer id;
+  /** Profile name. */
+  public java.lang.CharSequence name;
+  /** Company id. */
+  public java.lang.Integer companyId;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return id;
+    case 1: return name;
+    case 2: return companyId;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: id = (java.lang.Integer)value$; break;
+    case 1: name = (java.lang.CharSequence)value$; break;
+    case 2: companyId = (java.lang.Integer)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java
deleted file mode 100644
index 8baa9e7..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.e2e;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.MyTestUdf;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class TestSamzaSqlEndToEnd {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class);
-
-  @Test
-  public void testEndToEnd() throws Exception {
-    int numMessages = 20;
-
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id, CURRENT_TIME as long_value from testavro.SIMPLE1";
-    List<String> sqlStmts = Arrays.asList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
-
-    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
-        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
-        .sorted()
-        .collect(Collectors.toList());
-    Assert.assertEquals(numMessages, outMessages.size());
-    Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
-  }
-
-  @Test
-  public void testEndToEndFlatten() throws Exception {
-    int numMessages = 20;
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
-    String sql1 =
-        "Insert into testavro.outputTopic select Flatten(array_values) as string_value, id from testavro.COMPLEX1";
-    List<String> sqlStmts = Collections.singletonList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
-
-    List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
-
-    int expectedMessages = 0;
-    // Flatten de-normalizes the data. So there is separate record for each entry in the array.
-    for (int index = 1; index < numMessages; index++) {
-      expectedMessages = expectedMessages + Math.max(1, index);
-    }
-    Assert.assertEquals(expectedMessages, outMessages.size());
-  }
-
-  @Test
-  public void testEndToEndSubQuery() throws Exception {
-    int numMessages = 20;
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 =
-        "Insert into testavro.outputTopic select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)";
-    List<String> sqlStmts = Collections.singletonList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
-
-    List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
-
-    int expectedMessages = 0;
-    // Flatten de-normalizes the data. So there is separate record for each entry in the array.
-    for (int index = 1; index < numMessages; index++) {
-      expectedMessages = expectedMessages + Math.max(1, index);
-    }
-    Assert.assertEquals(expectedMessages, outMessages.size());
-  }
-
-  @Test
-  public void testEndToEndUdf() throws Exception {
-    int numMessages = 20;
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1";
-    List<String> sqlStmts = Collections.singletonList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
-
-    LOG.info("output Messages " + TestAvroSystemFactory.messages);
-
-    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
-        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("long_value").toString()))
-        .sorted()
-        .collect(Collectors.toList());
-    Assert.assertEquals(outMessages.size(), numMessages);
-    MyTestUdf udf = new MyTestUdf();
-
-    Assert.assertTrue(
-        IntStream.range(0, numMessages).map(udf::execute).boxed().collect(Collectors.toList()).equals(outMessages));
-  }
-
-  @Test
-  public void testRegexMatchUdfInWhereClause() throws Exception {
-    int numMessages = 20;
-    TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id from testavro.SIMPLE1 where RegexMatch('.*4', Name)";
-    List<String> sqlStmts = Collections.singletonList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
-
-    LOG.info("output Messages " + TestAvroSystemFactory.messages);
-    // There should be two messages that contain "4"
-    Assert.assertEquals(TestAvroSystemFactory.messages.size(), 2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java b/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
index 5655a81..a8731fb 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
@@ -26,10 +26,13 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 
+import static org.apache.samza.system.IncomingMessageEnvelope.*;
+
 
 public class SimpleSystemAdmin implements SystemAdmin {
 
@@ -46,7 +49,7 @@ public class SimpleSystemAdmin implements SystemAdmin {
     return streamNames.stream()
         .collect(Collectors.toMap(Function.identity(), streamName -> new SystemStreamMetadata(streamName,
             Collections.singletonMap(new Partition(0),
-                new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null)))));
+                new SystemStreamMetadata.SystemStreamPartitionMetadata(null, END_OF_STREAM_OFFSET, null)))));
   }
 
   @Override
@@ -58,4 +61,10 @@ public class SimpleSystemAdmin implements SystemAdmin {
     }
     return offset1.compareTo(offset2);
   }
+
+  @Override
+  public boolean createStream(StreamSpec streamSpec) {
+    // Do nothing.
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 3a9ae16..6dcad25 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -21,6 +21,7 @@ package org.apache.samza.sql.system;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,7 +32,10 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.sql.avro.schemas.Company;
 import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.PageView;
+import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -46,9 +50,42 @@ import org.slf4j.LoggerFactory;
 
 public class TestAvroSystemFactory implements SystemFactory {
   private static final Logger LOG = LoggerFactory.getLogger(TestAvroSystemFactory.class);
+
   public static final String CFG_NUM_MESSAGES = "numMessages";
+  public static final String CFG_INCLUDE_NULL_FOREIGN_KEYS = "includeNullForeignKeys";
   public static List<OutgoingMessageEnvelope> messages = new ArrayList<>();
 
+  public static final String[] profiles = {"John", "Mike", "Mary", "Joe", "Brad", "Jennifer"};
+  public static final String[] companies = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"};
+  public static final String[] pagekeys = {"inbox", "home", "search", "pymk", "group", "job"};
+
+  public static List<String> getPageKeyProfileNameJoin(int numMessages) {
+    return IntStream.range(0, numMessages)
+                .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profiles[i % profiles.length])
+                .collect(Collectors.toList());
+  }
+
+  public static List<String> getPageKeyProfileNameJoinWithNullForeignKeys(int numMessages) {
+    // All even profileId foreign keys are null
+    return IntStream.range(0, numMessages / 2)
+        .mapToObj(i -> pagekeys[(i * 2 + 1) % pagekeys.length] + "," + profiles[(i * 2 + 1) % profiles.length])
+        .collect(Collectors.toList());
+  }
+
+  public static List<String> getPageKeyProfileNameOuterJoinWithNullForeignKeys(int numMessages) {
+    // All even profileId foreign keys are null
+    return IntStream.range(0, numMessages)
+        .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + ((i % 2 == 0) ? "null" : profiles[i % profiles.length]))
+        .collect(Collectors.toList());
+  }
+
+  public static List<String> getPageKeyProfileCompanyNameJoin(int numMessages) {
+    return IntStream.range(0, numMessages)
+        .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profiles[i % profiles.length] +
+            "," + companies[i % companies.length])
+        .collect(Collectors.toList());
+  }
+
   @Override
   public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
     return new TestAvroSystemConsumer(systemName, config);
@@ -67,10 +104,16 @@ public class TestAvroSystemFactory implements SystemFactory {
   private class TestAvroSystemConsumer implements SystemConsumer {
     public static final int DEFAULT_NUM_EVENTS = 10;
     private final int numMessages;
-    private boolean simpleRecord;
+    private final boolean includeNullForeignKeys;
+    private final Set<SystemStreamPartition> simpleRecordMap = new HashSet<>();
+    private final Set<SystemStreamPartition> profileRecordMap = new HashSet<>();
+    private final Set<SystemStreamPartition> companyRecordMap = new HashSet<>();
+    private final Set<SystemStreamPartition> pageViewRecordMap = new HashSet<>();
 
     public TestAvroSystemConsumer(String systemName, Config config) {
       numMessages = config.getInt(String.format("systems.%s.%s", systemName, CFG_NUM_MESSAGES), DEFAULT_NUM_EVENTS);
+      includeNullForeignKeys = config.getBoolean(String.format("systems.%s.%s", systemName,
+          CFG_INCLUDE_NULL_FOREIGN_KEYS), false);
     }
 
     @Override
@@ -83,7 +126,18 @@ public class TestAvroSystemFactory implements SystemFactory {
 
     @Override
     public void register(SystemStreamPartition systemStreamPartition, String offset) {
-      simpleRecord = systemStreamPartition.getStream().toLowerCase().contains("simple");
+      if (systemStreamPartition.getStream().toLowerCase().contains("simple1")) {
+        simpleRecordMap.add(systemStreamPartition);
+      }
+      if (systemStreamPartition.getStream().toLowerCase().contains("profile")) {
+        profileRecordMap.add(systemStreamPartition);
+      }
+      if (systemStreamPartition.getStream().toLowerCase().contains("company")) {
+        companyRecordMap.add(systemStreamPartition);
+      }
+      if (systemStreamPartition.getStream().toLowerCase().contains("pageview")) {
+        pageViewRecordMap.add(systemStreamPartition);
+      }
     }
 
     @Override
@@ -93,8 +147,8 @@ public class TestAvroSystemFactory implements SystemFactory {
       set.forEach(ssp -> {
         // We send num Messages and an end of stream message following that.
         List<IncomingMessageEnvelope> envelopes = IntStream.range(0, numMessages + 1)
-            .mapToObj(i -> new IncomingMessageEnvelope(ssp,
-                i == numMessages ? IncomingMessageEnvelope.END_OF_STREAM_OFFSET : null, "key" + i, getData(i)))
+            .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, "key" + i,
+                getData(i, ssp)) : IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp))
             .collect(Collectors.toList());
         envelopeMap.put(ssp, envelopes);
       });
@@ -102,9 +156,15 @@ public class TestAvroSystemFactory implements SystemFactory {
       return envelopeMap;
     }
 
-    private Object getData(int index) {
-      if (simpleRecord) {
+    private Object getData(int index, SystemStreamPartition ssp) {
+      if (simpleRecordMap.contains(ssp)) {
         return createSimpleRecord(index);
+      } else if (profileRecordMap.contains(ssp)) {
+        return createProfileRecord(index);
+      } else if (companyRecordMap.contains(ssp)) {
+        return createCompanyRecord(index);
+      } else if (pageViewRecordMap.contains(ssp)) {
+        return createPageViewRecord(index);
       } else {
         return createComplexRecord(index);
       }
@@ -117,6 +177,29 @@ public class TestAvroSystemFactory implements SystemFactory {
       return record;
     }
 
+    private Object createProfileRecord(int index) {
+      GenericRecord record = new GenericData.Record(Profile.SCHEMA$);
+      record.put("id", index);
+      record.put("name", profiles[index % profiles.length]);
+      record.put("companyId", includeNullForeignKeys && (index % 2 == 0) ? null : index % companies.length);
+      return record;
+    }
+
+    private Object createCompanyRecord(int index) {
+      GenericRecord record = new GenericData.Record(Company.SCHEMA$);
+      record.put("id", index);
+      record.put("name", companies[index % companies.length]);
+      return record;
+    }
+
+    private Object createPageViewRecord(int index) {
+      GenericRecord record = new GenericData.Record(PageView.SCHEMA$);
+      // All even profileId foreign keys are null
+      record.put("profileId", includeNullForeignKeys && (index % 2 == 0) ? null : index);
+      record.put("pageKey", pagekeys[index % pagekeys.length]);
+      return record;
+    }
+
     private Object createComplexRecord(int index) {
       GenericRecord record = new GenericData.Record(ComplexRecord.SCHEMA$);
       record.put("id", index);

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 92766f6..b8b2814 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -28,15 +28,20 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.sql.avro.AvroRelConverterFactory;
 import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.avro.schemas.Company;
 import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.EnrichedPageView;
+import org.apache.samza.sql.avro.schemas.PageView;
+import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.sql.fn.FlattenUdf;
 import org.apache.samza.sql.fn.RegexMatchUdf;
 import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
+import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 
 
@@ -48,11 +53,21 @@ public class SamzaSqlTestConfig {
   public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
 
   public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
+    return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
+  }
+
+  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages) {
+    return fetchStaticConfigsWithFactories(props, numberOfMessages, false);
+  }
+
+  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
+      boolean includeNullForeignKeys) {
     HashMap<String, String> staticConfigs = new HashMap<>();
 
     staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
     staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
     staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
     staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER, "config");
@@ -75,8 +90,10 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(avroSystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
     staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
         String.valueOf(numberOfMessages));
-    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER, "avro");
-    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
+        includeNullForeignKeys ? "true" : "false");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config");
 
     String avroSamzaToRelMsgConverterDomain =
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
@@ -88,17 +105,31 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
         ConfigBasedAvroRelSchemaProviderFactory.class.getName());
 
-    staticConfigs.put(
-        configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-            "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "Profile"), ComplexRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "PROFILE"), Profile.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "PAGEVIEW"), PageView.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "COMPANY"), Company.SCHEMA$.toString());
 
-    staticConfigs.put(
-        configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-            "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "enrichedPageViewTopic"), EnrichedPageView.SCHEMA$.toString());
 
-    staticConfigs.put(
-        configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-            "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString());
+    staticConfigs.putAll(props);
 
     return staticConfigs;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
index b9cf803..4f1d08e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import org.apache.samza.config.Config;
 import org.apache.samza.sql.interfaces.SourceResolver;
 import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 
 
 public class TestSourceResolverFactory implements SourceResolverFactory {
@@ -33,6 +33,7 @@ public class TestSourceResolverFactory implements SourceResolverFactory {
   }
 
   private class TestSourceResolver implements SourceResolver {
+    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
     private final Config config;
 
     public TestSourceResolver(Config config) {
@@ -40,11 +41,26 @@ public class TestSourceResolverFactory implements SourceResolverFactory {
     }
 
     @Override
-    public SqlSystemStreamConfig fetchSourceInfo(String sourceName) {
+    public SqlSystemSourceConfig fetchSourceInfo(String sourceName) {
       String[] sourceComponents = sourceName.split("\\.");
-      Config systemConfigs = config.subset(sourceComponents[0] + ".");
-      return new SqlSystemStreamConfig(sourceComponents[0], sourceComponents[sourceComponents.length - 1],
-          Arrays.asList(sourceComponents), systemConfigs);
+      boolean isTable = false;
+      int systemIdx = 0;
+      int endIdx = sourceComponents.length - 1;
+      int streamIdx = endIdx;
+
+      if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+        isTable = true;
+        streamIdx = endIdx - 1;
+      }
+      Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
+      return new SqlSystemSourceConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
+          Arrays.asList(sourceComponents), systemConfigs, isTable);
+    }
+
+    @Override
+    public boolean isTable(String sourceName) {
+      String[] sourceComponents = sourceName.split("\\.");
+      return sourceComponents[sourceComponents.length - 1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/resources/log4j.xml b/samza-sql/src/test/resources/log4j.xml
index 6259b48..9d29506 100644
--- a/samza-sql/src/test/resources/log4j.xml
+++ b/samza-sql/src/test/resources/log4j.xml
@@ -28,6 +28,12 @@
 
   @log4j.loggers.public_access@
   <logger name="org.apache" additivity="false">
+    <level value="INFO"/>
+    <appender-ref ref="console"/>
+  </logger>
+
+  @log4j.loggers.public_access@
+  <logger name="org.apache.calcite.sql2rel" additivity="false">
     <level value="DEBUG"/>
     <appender-ref ref="console"/>
   </logger>


[3/3] samza git commit: Add stream-table join support for samza sql

Posted by xi...@apache.org.
Add stream-table join support for samza sql

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Yi Pan <ni...@gmail.com>

Closes #425 from atoomula/join


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/956cf412
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/956cf412
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/956cf412

Branch: refs/heads/master
Commit: 956cf412a44812d54286fd9a0c4d167239387362
Parents: 2d7b0f5
Author: Aditya Toomula <at...@linkedin.com>
Authored: Wed Mar 21 17:58:07 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Mar 21 17:58:07 2018 -0700

----------------------------------------------------------------------
 build.gradle                                    |   5 +-
 .../samza/runtime/LocalApplicationRunner.java   |   1 +
 .../org/apache/samza/config/StreamConfig.scala  |   1 +
 .../samza/sql/data/SamzaSqlCompositeKey.java    |  81 ++++
 .../sql/data/SamzaSqlExecutionContext.java      |   4 +
 .../samza/sql/data/SamzaSqlRelMessage.java      |  29 +-
 .../impl/ConfigBasedSourceResolverFactory.java  |  50 ++-
 .../samza/sql/interfaces/SourceResolver.java    |  14 +-
 .../sql/interfaces/SqlSystemSourceConfig.java   | 129 +++++++
 .../sql/interfaces/SqlSystemStreamConfig.java   | 117 ------
 .../apache/samza/sql/planner/QueryPlanner.java  |  12 +-
 .../sql/runner/SamzaSqlApplicationConfig.java   |  21 +-
 .../sql/runner/SamzaSqlApplicationRunner.java   |   6 +-
 .../samza/sql/testutil/SamzaSqlQueryParser.java |  31 +-
 .../samza/sql/translator/FilterTranslator.java  |  20 +-
 .../samza/sql/translator/JoinTranslator.java    | 279 ++++++++++++++
 .../samza/sql/translator/ProjectTranslator.java |   4 +-
 .../samza/sql/translator/QueryTranslator.java   |  16 +-
 .../SamzaSqlRelMessageJoinFunction.java         | 121 ++++++
 .../samza/sql/translator/ScanTranslator.java    |  12 +-
 .../apache/samza/sql/TestQueryTranslator.java   | 370 +++++++++++++++++-
 .../sql/TestSamzaSqlApplicationConfig.java      |   8 +-
 .../samza/sql/TestSamzaSqlQueryParser.java      |  24 +-
 .../sql/TestSamzaSqlRelMessageJoinFunction.java | 116 ++++++
 .../samza/sql/TestSamzaSqlRelMessageSerde.java  |  43 +++
 .../apache/samza/sql/avro/schemas/Company.avsc  |  39 ++
 .../apache/samza/sql/avro/schemas/Company.java  |  52 +++
 .../sql/avro/schemas/EnrichedPageView.avsc      |  45 +++
 .../sql/avro/schemas/EnrichedPageView.java      |  56 +++
 .../apache/samza/sql/avro/schemas/PageView.avsc |  39 ++
 .../apache/samza/sql/avro/schemas/PageView.java |  52 +++
 .../apache/samza/sql/avro/schemas/Profile.avsc  |  45 +++
 .../apache/samza/sql/avro/schemas/Profile.java  |  56 +++
 .../samza/sql/e2e/TestSamzaSqlEndToEnd.java     | 153 --------
 .../samza/sql/system/SimpleSystemAdmin.java     |  11 +-
 .../samza/sql/system/TestAvroSystemFactory.java |  95 ++++-
 .../samza/sql/testutil/SamzaSqlTestConfig.java  |  55 ++-
 .../sql/testutil/TestSourceResolverFactory.java |  26 +-
 samza-sql/src/test/resources/log4j.xml          |   6 +
 .../test/samzasql/TestSamzaSqlEndToEnd.java     | 385 +++++++++++++++++++
 .../org/apache/samza/tools/SamzaSqlConsole.java |  10 +-
 41 files changed, 2256 insertions(+), 383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 44a6ccd..d96ec96 100644
--- a/build.gradle
+++ b/build.gradle
@@ -316,11 +316,12 @@ project(':samza-sql') {
   dependencies {
     compile project(':samza-api')
     compile project(":samza-kafka_$scalaVersion")
+    compile project(":samza-kv-inmemory_$scalaVersion")
+    compile project(":samza-kv-rocksdb_$scalaVersion")
     compile "org.apache.avro:avro:$avroVersion"
     compile "org.apache.calcite:calcite-core:$calciteVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
 
-    testCompile project(":samza-test_$scalaVersion")
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
 
@@ -753,6 +754,7 @@ project(":samza-test_$scalaVersion") {
     compile project(":samza-kv-inmemory_$scalaVersion")
     compile project(":samza-kv-rocksdb_$scalaVersion")
     compile project(":samza-core_$scalaVersion")
+    compile project(":samza-sql")
     runtime project(":samza-log4j")
     runtime project(":samza-yarn_$scalaVersion")
     runtime project(":samza-kafka_$scalaVersion")
@@ -769,6 +771,7 @@ project(":samza-test_$scalaVersion") {
     testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
     testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
     testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
+    testCompile project(":samza-sql").sourceSets.test.output
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 5c5ee84..9529581 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -153,6 +153,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
 
       String executionPlanJson = plan.getPlanAsJson();
       writePlanJsonFile(executionPlanJson);
+      LOG.info("Execution Plan: \n" + executionPlanJson);
 
       // 2. create the necessary streams
       // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 31f9b92..db86969 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -50,6 +50,7 @@ object StreamConfig {
   val IS_BOUNDED_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_BOUNDED
   val PRIORITY_FOR_STREAM_ID = STREAM_ID_PREFIX + PRIORITY
   val CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID = STREAM_ID_PREFIX + CONSUMER_OFFSET_DEFAULT
+  val BOOTSTRAP_FOR_STREAM_ID = STREAM_ID_PREFIX + BOOTSTRAP
 
   implicit def Config2Stream(config: Config) = new StreamConfig(config)
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
new file mode 100644
index 0000000..f646d9a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
@@ -0,0 +1,81 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * A serializable class that holds different key parts.
+ */
+public class SamzaSqlCompositeKey implements Serializable {
+
+  @JsonProperty("keyParts")
+  private ArrayList<Object> keyParts;
+  private int hashCode;
+
+  @JsonCreator
+  public SamzaSqlCompositeKey(@JsonProperty("keyParts") List<Object> keyParts) {
+    this.keyParts = new ArrayList<>(keyParts);
+    hashCode = keyParts.hashCode();
+  }
+
+  /**
+   * Get the keyParts of all the columns in the relational message.
+   * @return the keyParts of all the columns
+   */
+  @JsonProperty("keyParts")
+  public ArrayList<Object> getKeyParts() {
+    return keyParts;
+  }
+
+  @Override
+  public String toString() {
+    return String.join(", ", Arrays.toString(keyParts.toArray()));
+  }
+
+  @Override
+  public int hashCode() {
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return this == o || o != null && getClass() == o.getClass() && keyParts.equals(((SamzaSqlCompositeKey) o).keyParts);
+  }
+
+  /**
+   * Create the SamzaSqlCompositeKey from the rel message.
+   * @param message Represents the samza sql rel message.
+   * @param relIdx list of keys in the form of field indices within the rel message.
+   */
+  public static SamzaSqlCompositeKey createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) {
+    ArrayList<Object> keyParts = new ArrayList<>();
+    for (int idx : relIdx) {
+      keyParts.add(message.getFieldValues().get(idx));
+    }
+    return new SamzaSqlCompositeKey(keyParts);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
index 88bcb61..b0c30dd 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
@@ -58,4 +58,8 @@ public class SamzaSqlExecutionContext {
     scalarUdf.init(udfConfig);
     return scalarUdf;
   }
+
+  public SamzaSqlApplicationConfig getSamzaSqlApplicationConfig() {
+    return sqlConfig;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index 452a32c..b54634f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -19,10 +19,12 @@
 
 package org.apache.samza.sql.data;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import org.apache.commons.lang.Validate;
+import org.codehaus.jackson.annotate.JsonProperty;
 
 
 /**
@@ -31,15 +33,19 @@ import org.apache.commons.lang.Validate;
  * their associated column names. Right now we donot store any other metadata other than the column name in the
  * SamzaSqlRelationalMessage, In future if we find a need, we could add additional column ddl metadata around
  * primary Key, nullability, etc.
+ * TODO: SAMZA-1619 Support serialization of nested SamzaSqlRelMessage.
  */
-public class SamzaSqlRelMessage {
+public class SamzaSqlRelMessage implements Serializable {
 
   public static final String KEY_NAME = "__key__";
 
-  private final List<Object> fieldValues = new ArrayList<>();
-  private final List<String> fieldNames = new ArrayList<>();
   private final Object key;
 
+  @JsonProperty("fieldNames")
+  private final List<String> fieldNames;
+  @JsonProperty("fieldValues")
+  private final List<Object> fieldValues;
+
   /**
    * Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values.
    * If the field list contains KEY, then it extracts the key out of the fields to creates a
@@ -49,16 +55,20 @@ public class SamzaSqlRelMessage {
    *               delete change capture event in the stream or because of the result of the outer join or the fields
    *               themselves are null in the original stream.
    */
-  public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues) {
+  public SamzaSqlRelMessage(@JsonProperty("fieldNames") List<String> fieldNames,
+      @JsonProperty("fieldValues") List<Object> fieldValues) {
     Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
 
+    this.fieldNames = new ArrayList<>();
+    this.fieldValues = new ArrayList<>();
+
     int keyIndex = fieldNames.indexOf(KEY_NAME);
     Object key = null;
     if (keyIndex != -1) {
       key = fieldValues.get(keyIndex);
     }
-
     this.key = key;
+
     this.fieldNames.addAll(fieldNames);
     this.fieldValues.addAll(fieldValues);
   }
@@ -74,10 +84,15 @@ public class SamzaSqlRelMessage {
    */
   public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fieldValues) {
     Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
+
+    this.fieldNames = new ArrayList<>();
+    this.fieldValues = new ArrayList<>();
+
     this.key = key;
     this.fieldNames.add(KEY_NAME);
-    this.fieldNames.addAll(fieldNames);
     this.fieldValues.add(key);
+
+    this.fieldNames.addAll(fieldNames);
     this.fieldValues.addAll(fieldValues);
   }
 
@@ -85,10 +100,12 @@ public class SamzaSqlRelMessage {
    * Get the field names of all the columns in the relational message.
    * @return the field names of all columns.
    */
+  @JsonProperty("fieldNames")
   public List<String> getFieldNames() {
     return fieldNames;
   }
 
+  @JsonProperty("fieldValues")
   public List<Object> getFieldValues() {
     return this.fieldValues;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
index a2d8b0c..5348d3d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
@@ -23,14 +23,16 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.sql.interfaces.SourceResolver;
 import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
  * Source Resolver implementation that uses static config to return a config corresponding to a system stream.
- * This Source resolver implementation supports sources of type {systemName}.{streamName}
+ * This Source resolver implementation supports sources of type {systemName}.{streamName}[.$table]
+ * {systemName}.{streamName} indicates a stream
+ * {systemName}.{streamName}.$table indicates a table
  */
 public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
 
@@ -44,6 +46,7 @@ public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
   }
 
   private class ConfigBasedSourceResolver implements SourceResolver {
+    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
     private final Config config;
 
     public ConfigBasedSourceResolver(Config config) {
@@ -51,19 +54,52 @@ public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
     }
 
     @Override
-    public SqlSystemStreamConfig fetchSourceInfo(String source) {
+    public SqlSystemSourceConfig fetchSourceInfo(String source) {
       String[] sourceComponents = source.split("\\.");
+      boolean isTable = false;
+
+      // This source resolver expects sources of format {systemName}.{streamName}[.$table]
+      //  * First source part is always system name.
+      //  * The last source part could be either a "$table" keyword or stream name. If it is "$table", then stream name
+      //    should be the one before the last source part.
+      int endIdx = sourceComponents.length - 1;
+      int streamIdx = endIdx;
+      boolean invalidQuery = false;
 
-      // This source resolver expects sources of format {systemName}.{streamName}
       if (sourceComponents.length != 2) {
-        String msg = String.format("Source %s is not of the format {systemName}.{streamName{", source);
+        if (sourceComponents.length != 3 ||
+            !sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+          invalidQuery = true;
+        }
+      } else {
+        if (sourceComponents[0].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD) ||
+            sourceComponents[1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+          invalidQuery = true;
+        }
+      }
+
+      if (invalidQuery) {
+        String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", source,
+            SAMZA_SQL_QUERY_TABLE_KEYWORD);
         LOG.error(msg);
         throw new SamzaException(msg);
       }
+
+      if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+        isTable = true;
+        streamIdx = endIdx - 1;
+      }
+
       String systemName = sourceComponents[0];
-      String streamName = sourceComponents[1];
+      String streamName = sourceComponents[streamIdx];
+
+      return new SqlSystemSourceConfig(systemName, streamName, fetchSystemConfigs(systemName), isTable);
+    }
 
-      return new SqlSystemStreamConfig(systemName, streamName, fetchSystemConfigs(systemName));
+    @Override
+    public boolean isTable(String sourceName) {
+      String[] sourceComponents = sourceName.split("\\.");
+      return sourceComponents[sourceComponents.length - 1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD);
     }
 
     private Config fetchSystemConfigs(String systemName) {

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
index ac3fd31..c161a0d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
@@ -20,7 +20,7 @@
 package org.apache.samza.sql.interfaces;
 
 /**
- * Source Resolvers are used by Samza Sql application to fetch the {@link SqlSystemStreamConfig} corresponding to the source.
+ * Source Resolvers are used by Samza Sql application to fetch the {@link SqlSystemSourceConfig} corresponding to the source.
  */
 public interface SourceResolver {
   /**
@@ -30,5 +30,15 @@ public interface SourceResolver {
    * @return
    *  System stream config corresponding to the source.
    */
-  SqlSystemStreamConfig fetchSourceInfo(String sourceName);
+  SqlSystemSourceConfig fetchSourceInfo(String sourceName);
+
+  /**
+   * Returns if a given source is a table. Different source resolvers could have different notations in the source
+   * name for denoting a table. Eg: system.stream.$table
+   * @param sourceName
+   *  source that needs to be checked if it is a table.
+   * @return
+   *  true if the source is a table, else false.
+   */
+  boolean isTable(String sourceName);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
new file mode 100644
index 0000000..02ec18a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
@@ -0,0 +1,129 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Configs associated with a system source. Both streams and table sources are supported.
+ * For now, only local tables are supported.
+ */
+public class SqlSystemSourceConfig {
+
+  public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
+  public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
+
+  private final String systemName;
+
+  private final String streamName;
+
+  private final String samzaRelConverterName;
+  private final SystemStream systemStream;
+
+  private final String source;
+  private String relSchemaProviderName;
+
+  private Config config;
+
+  private List<String> sourceParts;
+
+  public SqlSystemSourceConfig(String systemName, String streamName, Config systemConfig) {
+    this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig, false);
+  }
+
+  public SqlSystemSourceConfig(String systemName, String streamName, Config systemConfig, boolean isTable) {
+    this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig, isTable);
+  }
+
+  public SqlSystemSourceConfig(String systemName, String streamName, List<String> sourceParts,
+      Config systemConfig, boolean isTable) {
+
+
+    HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
+    this.systemName = systemName;
+    this.streamName = streamName;
+    this.source = getSourceFromSourceParts(sourceParts);
+    this.sourceParts = sourceParts;
+    this.systemStream = new SystemStream(systemName, streamName);
+
+    samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
+    Validate.notEmpty(samzaRelConverterName,
+        String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+
+    relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
+
+    // Removing the Samza SQL specific configs to get the remaining Samza configs.
+    streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
+    streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
+
+    // Currently, only local table is supported. And it is assumed that all tables are local tables.
+    if (isTable) {
+      streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamName), "true");
+      streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamName), "oldest");
+    }
+
+    config = new MapConfig(streamConfigs);
+  }
+
+  public static String getSourceFromSourceParts(List<String> sourceParts) {
+    return Joiner.on(".").join(sourceParts);
+  }
+
+  public List<String> getSourceParts() {
+    return sourceParts;
+  }
+
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public String getSamzaRelConverterName() {
+    return samzaRelConverterName;
+  }
+
+  public String getRelSchemaProviderName() {
+    return relSchemaProviderName;
+  }
+
+  public SystemStream getSystemStream() {
+    return systemStream;
+  }
+
+  public Config getConfig() {
+    return config;
+  }
+
+  public String getSource() {
+    return source;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
deleted file mode 100644
index d8965a4..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.interfaces;
-
-import com.google.common.base.Joiner;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import org.apache.commons.lang.Validate;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * Configs associated with a system stream.
- */
-public class SqlSystemStreamConfig {
-
-  public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
-  public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
-
-  private final String systemName;
-
-  private final String streamName;
-
-  private final String samzaRelConverterName;
-  private final SystemStream systemStream;
-
-  private final String source;
-  private String relSchemaProviderName;
-
-  private Config config;
-
-  private List<String> sourceParts;
-
-  public SqlSystemStreamConfig(String systemName, String streamName, Config systemConfig) {
-    this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig);
-  }
-
-  public SqlSystemStreamConfig(String systemName, String streamName, List<String> sourceParts,
-      Config systemConfig) {
-
-
-    HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
-    this.systemName = systemName;
-    this.streamName = streamName;
-    this.source = getSourceFromSourceParts(sourceParts);
-    this.sourceParts = sourceParts;
-    this.systemStream = new SystemStream(systemName, streamName);
-
-    samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
-    Validate.notEmpty(samzaRelConverterName,
-        String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
-
-    relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
-
-    // Removing the Samza SQL specific configs to get the remaining Samza configs.
-    streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
-    streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
-
-    config = new MapConfig(streamConfigs);
-  }
-
-  public static String getSourceFromSourceParts(List<String> sourceParts) {
-    return Joiner.on(".").join(sourceParts);
-  }
-
-  public List<String> getSourceParts() {
-    return sourceParts;
-  }
-
-  public String getSystemName() {
-    return systemName;
-  }
-
-  public String getStreamName() {
-    return streamName;
-  }
-
-  public String getSamzaRelConverterName() {
-    return samzaRelConverterName;
-  }
-
-  public String getRelSchemaProviderName() {
-    return relSchemaProviderName;
-  }
-
-  public SystemStream getSystemStream() {
-    return systemStream;
-  }
-
-  public Config getConfig() {
-    return config;
-  }
-
-  public String getSource() {
-    return source;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 2b67f18..f21eccf 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -55,7 +55,7 @@ import org.apache.calcite.tools.Planner;
 import org.apache.samza.SamzaException;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,11 +72,11 @@ public class QueryPlanner {
   // Mapping between the source to the RelSchemaProvider corresponding to the source.
   private final Map<String, RelSchemaProvider> relSchemaProviders;
 
-  // Mapping between the source to the SqlSystemStreamConfig corresponding to the source.
-  private final Map<String, SqlSystemStreamConfig> systemStreamConfigBySource;
+  // Mapping between the source to the SqlSystemSourceConfig corresponding to the source.
+  private final Map<String, SqlSystemSourceConfig> systemStreamConfigBySource;
 
   public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
-      Map<String, SqlSystemStreamConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
+      Map<String, SqlSystemSourceConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
     this.relSchemaProviders = relSchemaProviders;
     this.systemStreamConfigBySource = systemStreamConfigBySource;
     this.udfMetadata = udfMetadata;
@@ -88,7 +88,7 @@ public class QueryPlanner {
       CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
       SchemaPlus rootSchema = calciteConnection.getRootSchema();
 
-      for (SqlSystemStreamConfig ssc : systemStreamConfigBySource.values()) {
+      for (SqlSystemSourceConfig ssc : systemStreamConfigBySource.values()) {
         SchemaPlus previousLevelSchema = rootSchema;
         List<String> sourceParts = ssc.getSourceParts();
         RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());
@@ -96,7 +96,7 @@ public class QueryPlanner {
         for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) {
           String sourcePart = sourceParts.get(sourcePartIndex);
           if (sourcePartIndex < sourceParts.size() - 1) {
-            SchemaPlus sourcePartSchema = rootSchema.getSubSchema(sourcePart);
+            SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart);
             if (sourcePartSchema == null) {
               sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
             }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 227a0f1..aeb7f35 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -42,7 +42,7 @@ import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
 import org.apache.samza.sql.interfaces.SourceResolver;
 import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.apache.samza.sql.interfaces.UdfResolver;
 import org.apache.samza.sql.testutil.JsonUtil;
@@ -50,7 +50,6 @@ import org.apache.samza.sql.testutil.ReflectionUtils;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
 import org.apache.samza.sql.testutil.SqlFileParser;
-import org.apache.samza.system.SystemStream;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,8 +85,8 @@ public class SamzaSqlApplicationConfig {
 
   private final Collection<UdfMetadata> udfMetadata;
 
-  private final Map<String, SqlSystemStreamConfig> inputSystemStreamConfigBySource;
-  private final Map<String, SqlSystemStreamConfig> outputSystemStreamConfigsBySource;
+  private final Map<String, SqlSystemSourceConfig> inputSystemStreamConfigBySource;
+  private final Map<String, SqlSystemSourceConfig> outputSystemStreamConfigsBySource;
 
   private final List<String> sql;
 
@@ -109,7 +108,7 @@ public class SamzaSqlApplicationConfig {
         .flatMap(Collection::stream)
         .collect(Collectors.toMap(Function.identity(), sourceResolver::fetchSourceInfo));
 
-    Set<SqlSystemStreamConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
+    Set<SqlSystemSourceConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
 
     outputSystemStreamConfigsBySource = queryInfo.stream()
         .map(QueryInfo::getOutputSource)
@@ -117,13 +116,13 @@ public class SamzaSqlApplicationConfig {
     systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
 
     relSchemaProvidersBySource = systemStreamConfigs.stream()
-        .collect(Collectors.toMap(SqlSystemStreamConfig::getSource,
+        .collect(Collectors.toMap(SqlSystemSourceConfig::getSource,
             x -> initializePlugin("RelSchemaProvider", x.getRelSchemaProviderName(), staticConfig,
                 CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
                 (o, c) -> ((RelSchemaProviderFactory) o).create(x.getSystemStream(), c))));
 
     samzaRelConvertersBySource = systemStreamConfigs.stream()
-        .collect(Collectors.toMap(SqlSystemStreamConfig::getSource,
+        .collect(Collectors.toMap(SqlSystemSourceConfig::getSource,
             x -> initializePlugin("SamzaRelConverter", x.getSamzaRelConverterName(), staticConfig,
                 CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
                     relSchemaProvidersBySource.get(x.getSource()), c))));
@@ -226,11 +225,11 @@ public class SamzaSqlApplicationConfig {
     return udfMetadata;
   }
 
-  public Map<String, SqlSystemStreamConfig> getInputSystemStreamConfigBySource() {
+  public Map<String, SqlSystemSourceConfig> getInputSystemStreamConfigBySource() {
     return inputSystemStreamConfigBySource;
   }
 
-  public Map<String, SqlSystemStreamConfig> getOutputSystemStreamConfigsBySource() {
+  public Map<String, SqlSystemSourceConfig> getOutputSystemStreamConfigsBySource() {
     return outputSystemStreamConfigsBySource;
   }
 
@@ -241,4 +240,8 @@ public class SamzaSqlApplicationConfig {
   public Map<String, RelSchemaProvider> getRelSchemaProviders() {
     return relSchemaProvidersBySource;
   }
+
+  public SourceResolver getSourceResolver() {
+    return sourceResolver;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 83928e1..f54ca42 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -32,7 +32,7 @@ import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.RemoteApplicationRunner;
 import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,13 +82,13 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
     for (SamzaSqlQueryParser.QueryInfo query : queryInfo) {
       // Populate stream to system mapping config for input and output system streams
       for (String inputSource : query.getInputSources()) {
-        SqlSystemStreamConfig inputSystemStreamConfig = sourceResolver.fetchSourceInfo(inputSource);
+        SqlSystemSourceConfig inputSystemStreamConfig = sourceResolver.fetchSourceInfo(inputSource);
         newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
             inputSystemStreamConfig.getSystemName());
         newConfig.putAll(inputSystemStreamConfig.getConfig());
       }
 
-      SqlSystemStreamConfig outputSystemStreamConfig = sourceResolver.fetchSourceInfo(query.getOutputSource());
+      SqlSystemSourceConfig outputSystemStreamConfig = sourceResolver.fetchSourceInfo(query.getOutputSource());
       newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()),
           outputSystemStreamConfig.getSystemName());
       newConfig.putAll(outputSystemStreamConfig.getConfig());

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
index dd5f3bc..faf903a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
@@ -23,7 +23,6 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -102,14 +101,14 @@ public class SamzaSqlQueryParser {
 
     String outputSource;
     String selectQuery;
-    String inputSource;
+    ArrayList<String> inputSources;
     if (sqlNode instanceof SqlInsert) {
       SqlInsert sqlInsert = ((SqlInsert) sqlNode);
       outputSource = sqlInsert.getTargetTable().toString();
       if (sqlInsert.getSource() instanceof SqlSelect) {
         SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
         selectQuery = m.group(2);
-        inputSource = getInputFromSelectQuery(sqlSelect);
+        inputSources = getInputsFromSelectQuery(sqlSelect);
       } else {
         throw new SamzaException("Sql query is not of the expected format");
       }
@@ -117,7 +116,7 @@ public class SamzaSqlQueryParser {
       throw new SamzaException("Sql query is not of the expected format");
     }
 
-    return new QueryInfo(selectQuery, Collections.singletonList(inputSource), outputSource);
+    return new QueryInfo(selectQuery, inputSources, outputSource);
   }
 
   private static Planner createPlanner() {
@@ -147,17 +146,17 @@ public class SamzaSqlQueryParser {
     return Frameworks.getPlanner(frameworkConfig);
   }
 
-  private static String getInputFromSelectQuery(SqlSelect sqlSelect) {
+  private static ArrayList<String> getInputsFromSelectQuery(SqlSelect sqlSelect) {
     ArrayList<String> input = new ArrayList<>();
     getInput(sqlSelect.getFrom(), input);
-    if (input.size() != 1) {
+    if (input.size() < 1) {
       throw new SamzaException("Unsupported query " + sqlSelect);
     }
 
-    return input.get(0);
+    return input;
   }
 
-  private static void getInput(SqlNode node, ArrayList<String> inputSource) {
+  private static void getInput(SqlNode node, ArrayList<String> inputSourceList) {
     if (node instanceof SqlJoin) {
       SqlJoin joinNode = (SqlJoin) node;
       ArrayList<String> inputsLeft = new ArrayList<>();
@@ -165,24 +164,20 @@ public class SamzaSqlQueryParser {
       getInput(joinNode.getLeft(), inputsLeft);
       getInput(joinNode.getRight(), inputsRight);
 
-      if (!inputsLeft.isEmpty() && !inputsRight.isEmpty()) {
-        throw new SamzaException("Joins on two entities are not supported yet");
-      }
-
-      inputSource.addAll(inputsLeft);
-      inputSource.addAll(inputsRight);
+      inputSourceList.addAll(inputsLeft);
+      inputSourceList.addAll(inputsRight);
     } else if (node instanceof SqlIdentifier) {
-      inputSource.add(node.toString());
+      inputSourceList.add(node.toString());
     } else if (node instanceof SqlBasicCall) {
       SqlBasicCall basicCall = ((SqlBasicCall) node);
       if (basicCall.getOperator() instanceof SqlAsOperator) {
-        getInput(basicCall.operand(0), inputSource);
+        getInput(basicCall.operand(0), inputSourceList);
       } else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) {
-        inputSource.add(getInputFromSelectQuery(basicCall.operand(0)));
+        inputSourceList.addAll(getInputsFromSelectQuery(basicCall.operand(0)));
         return;
       }
     } else if (node instanceof SqlSelect) {
-      getInput(((SqlSelect) node).getFrom(), inputSource);
+      getInput(((SqlSelect) node).getFrom(), inputSourceList);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index 686ac15..798f0b3 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -22,7 +22,10 @@ package org.apache.samza.sql.translator;
 import java.util.Arrays;
 import java.util.Collections;
 
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexNode;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.sql.data.Expression;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
@@ -34,16 +37,23 @@ import org.slf4j.LoggerFactory;
  * Translator to translate the LogicalFilter node in the relational graph to the corresponding StreamGraph
  * implementation
  */
-public class FilterTranslator {
+class FilterTranslator {
 
   private static final Logger log = LoggerFactory.getLogger(FilterTranslator.class);
 
-  public void translate(final LogicalFilter filter, final TranslatorContext context) {
+  void translate(final LogicalFilter filter, final TranslatorContext context) {
     MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(filter.getInput().getId());
+    MessageStream<SamzaSqlRelMessage> outputStream = translateFilter(inputStream, filter.getInputs(),
+        filter.getCondition(), context);
+    context.registerMessageStream(filter.getId(), outputStream);
+  }
+
+  static MessageStream<SamzaSqlRelMessage> translateFilter(MessageStream<SamzaSqlRelMessage> inputStream,
+      List<RelNode> inputs, RexNode condition, final TranslatorContext context) {
     Expression expr =
-        context.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
+        context.getExpressionCompiler().compile(inputs, Collections.singletonList(condition));
 
-    MessageStream<SamzaSqlRelMessage> outputStream = inputStream.filter(message -> {
+    return inputStream.filter(message -> {
       Object[] result = new Object[1];
       expr.execute(context.getExecutionContext(), context.getDataContext(), message.getFieldValues().toArray(), result);
       if (result.length > 0 && result[0] instanceof Boolean) {
@@ -56,7 +66,5 @@ public class FilterTranslator {
         return false;
       }
     });
-
-    context.registerMessageStream(filter.getId(), outputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
new file mode 100644
index 0000000..70c1968
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -0,0 +1,279 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.table.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*;
+
+
+/**
+ * Translator to translate the LogicalJoin node in the relational graph to the corresponding StreamGraph
+ * implementation.
+ * Join is supported with the following caveats:
+ *   0. Only local tables are supported. Remote/composite tables are not yet supported.
+ *   1. Only stream-table joins are supported. No stream-stream joins.
+ *   2. Only Equi-joins are supported. No theta-joins.
+ *   3. Inner joins, Left and Right outer joins are supported. No cross joins, full outer joins or natural joins.
+ *   4. Join condition with a constant is not supported.
+ *   5. Compound join condition with only AND operator is supported. AND operator with a constant is not supported. No
+ *      support for OR operator or any other operator in the join condition.
+ *   6. Join condition with UDFs is not supported. Eg: udf1(a.key) = udf2(b.key) is not supported.
+ *
+ * It is assumed that the stream denoted as 'table' is already partitioned by the key(s) specified in the join
+ * condition. We do not repartition the table as bootstrap semantic is not propagated to the intermediate streams.
+ * Please refer SAMZA-1613 for more details on this. But we always repartition the stream by the key(s) specified in
+ * the join condition.
+ */
+class JoinTranslator {
+
+  private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
+  private int joinId;
+  private SourceResolver sourceResolver;
+
+  JoinTranslator(int joinId, SourceResolver sourceResolver) {
+    this.joinId = joinId;
+    this.sourceResolver = sourceResolver;
+  }
+
+  void translate(final LogicalJoin join, final TranslatorContext context) {
+
+    // Do the validation of join query
+    validateJoinQuery(join);
+
+    boolean isTablePosOnRight = isTable(join.getRight());
+    List<Integer> streamKeyIds = new LinkedList<>();
+    List<Integer> tableKeyIds = new LinkedList<>();
+
+    // Fetch the stream and table indices corresponding to the fields given in the join condition.
+    populateStreamAndTableKeyIds(((RexCall) join.getCondition()).getOperands(), join, isTablePosOnRight, streamKeyIds,
+        tableKeyIds);
+
+    JsonSerdeV2<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
+    JsonSerdeV2<SamzaSqlRelMessage> relMsgSerde = new JsonSerdeV2<>(SamzaSqlRelMessage.class);
+
+    Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, keySerde, relMsgSerde, join, context);
+
+    MessageStream<SamzaSqlRelMessage> inputStream =
+        isTablePosOnRight ?
+            context.getMessageStream(join.getLeft().getId()) : context.getMessageStream(join.getRight().getId());
+
+    List<String> streamFieldNames = (isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames();
+    List<String> tableFieldNames = (isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames();
+    Validate.isTrue(streamKeyIds.size() == tableKeyIds.size());
+    log.info("Joining on the following Stream and Table field(s): ");
+    for (int i = 0; i < streamKeyIds.size(); i++) {
+      log.info(streamFieldNames.get(streamKeyIds.get(i)) + " with " + tableFieldNames.get(tableKeyIds.get(i)));
+    }
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(join.getJoinType(), isTablePosOnRight, streamKeyIds, streamFieldNames,
+            tableFieldNames);
+
+    // Always re-partition the messages from the input stream by the composite key and then join the messages
+    // with the table.
+    MessageStream<SamzaSqlRelMessage> outputStream =
+        inputStream
+            .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds),
+                m -> m,
+                KVSerde.of(keySerde, relMsgSerde),
+                "stream_" + joinId)
+            .map(KV::getValue)
+            .join(table, joinFn);
+
+    context.registerMessageStream(join.getId(), outputStream);
+  }
+
+  private void validateJoinQuery(LogicalJoin join) {
+    JoinRelType joinRelType = join.getJoinType();
+
+    if (joinRelType.compareTo(JoinRelType.INNER) != 0 && joinRelType.compareTo(JoinRelType.LEFT) != 0
+        && joinRelType.compareTo(JoinRelType.RIGHT) != 0) {
+      throw new SamzaException("Query with only INNER and LEFT/RIGHT OUTER join are supported.");
+    }
+
+    boolean isTablePosOnLeft = isTable(join.getLeft());
+    boolean isTablePosOnRight = isTable(join.getRight());
+
+    if (!isTablePosOnLeft && !isTablePosOnRight) {
+      throw new SamzaException("Invalid query with both sides of join being denoted as 'stream'. "
+          + "Stream-stream join is not yet supported. " + dumpRelPlanForNode(join));
+    }
+
+    if (isTablePosOnLeft && isTablePosOnRight) {
+      throw new SamzaException("Invalid query with both sides of join being denoted as 'table'. " +
+          dumpRelPlanForNode(join));
+    }
+
+    if (joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnLeft && !isTablePosOnRight) {
+      throw new SamzaException("Invalid query for outer left join. Left side of the join should be a 'stream' and "
+          + "right side of join should be a 'table'. " + dumpRelPlanForNode(join));
+    }
+
+    if (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && isTablePosOnRight && !isTablePosOnLeft) {
+      throw new SamzaException("Invalid query for outer right join. Left side of the join should be a 'table' and "
+          + "right side of join should be a 'stream'. " + dumpRelPlanForNode(join));
+    }
+
+    validateJoinCondition(join.getCondition());
+  }
+
+  private void validateJoinCondition(RexNode operand) {
+    if (!(operand instanceof RexCall)) {
+      throw new SamzaException("SQL Query is not supported. Join condition operand " + operand +
+          " is of type " + operand.getClass());
+    }
+
+    RexCall condition = (RexCall) operand;
+
+    if (condition.isAlwaysTrue()) {
+      throw new SamzaException("Query results in a cross join, which is not supported. Please optimize the query."
+          + " It is expected that the joins should include JOIN ON operator in the sql query.");
+    }
+
+    if (condition.getKind() != SqlKind.EQUALS && condition.getKind() != SqlKind.AND) {
+      throw new SamzaException("Only equi-joins and AND operator is supported in join condition.");
+    }
+  }
+
+  // Fetch the stream and table key indices corresponding to the fields given in the join condition by parsing through
+  // the condition. Stream and table key indices are populated in streamKeyIds and tableKeyIds respectively.
+  private void populateStreamAndTableKeyIds(List<RexNode> operands, final LogicalJoin join, boolean isTablePosOnRight,
+      List<Integer> streamKeyIds, List<Integer> tableKeyIds) {
+
+    // All non-leaf operands in the join condition should be expressions.
+    if (operands.get(0) instanceof RexCall) {
+      operands.forEach(operand -> {
+        validateJoinCondition(operand);
+        populateStreamAndTableKeyIds(((RexCall) operand).getOperands(), join, isTablePosOnRight, streamKeyIds, tableKeyIds);
+      });
+      return;
+    }
+
+    // We are at the leaf of the join condition. Only binary operators are supported.
+    Validate.isTrue(operands.size() == 2);
+
+    // Only reference operands are supported in row expressions and not constants.
+    // a.key = b.key is supported with a.key and b.key being reference operands.
+    // a.key = "constant" is not yet supported.
+    if (!(operands.get(0) instanceof RexInputRef) || !(operands.get(1) instanceof RexInputRef)) {
+      throw new SamzaException("SQL query is not supported. Join condition " + join.getCondition() + " should have "
+          + "reference operands but the types are " + operands.get(0).getClass() + " and " + operands.get(1).getClass());
+    }
+
+    // Join condition is commutative, meaning, a.key = b.key is equivalent to b.key = a.key.
+    // Calcite assigns the indices to the fields based on the order a and b are specified in
+    // the sql 'from' clause. Let's put the operand with smaller index in leftRef and larger
+    // index in rightRef so that the order of operands in the join condition is in the order
+    // the stream and table are specified in the 'from' clause.
+    RexInputRef leftRef = (RexInputRef) operands.get(0);
+    RexInputRef rightRef = (RexInputRef) operands.get(1);
+
+    // Let's validate the key used in the join condition.
+    validateKey(leftRef);
+    validateKey(rightRef);
+
+    if (leftRef.getIndex() > rightRef.getIndex()) {
+      RexInputRef tmpRef = leftRef;
+      leftRef = rightRef;
+      rightRef = tmpRef;
+    }
+
+    // Get the table key index and stream key index
+    int deltaKeyIdx = rightRef.getIndex() - join.getLeft().getRowType().getFieldCount();
+    streamKeyIds.add(isTablePosOnRight ? leftRef.getIndex() : deltaKeyIdx);
+    tableKeyIds.add(isTablePosOnRight ? deltaKeyIdx : leftRef.getIndex());
+  }
+
+  private void validateKey(RexInputRef ref) {
+    SqlTypeName sqlTypeName = ref.getType().getSqlTypeName();
+    // Only primitive types are supported in the key
+    if (sqlTypeName != SqlTypeName.BOOLEAN && sqlTypeName != SqlTypeName.TINYINT && sqlTypeName != SqlTypeName.SMALLINT
+        && sqlTypeName != SqlTypeName.INTEGER && sqlTypeName != SqlTypeName.CHAR && sqlTypeName != SqlTypeName.BIGINT
+        && sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT) {
+      log.error("Unsupported key type " + sqlTypeName + " used in join condition.");
+      throw new SamzaException("Unsupported key type used in join condition.");
+    }
+  }
+
+  private String dumpRelPlanForNode(RelNode relNode) {
+    return RelOptUtil.dumpPlan("Rel expression: ",
+        relNode, SqlExplainFormat.TEXT,
+        SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+  }
+
+  private boolean isTable(RelNode relNode) {
+    // NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of
+    // stream-table-table join, the left side of the join is join output, which we always
+    // assume to be a stream. The intermediate stream won't be an instance of EnumerableTableScan.
+    return relNode instanceof EnumerableTableScan &&
+        sourceResolver.isTable(String.join(".", relNode.getTable().getQualifiedName()));
+  }
+
+  private Table loadLocalTable(boolean isTablePosOnRight, List<Integer> tableKeyIds, Serde keySerde, Serde relMsgSerde,
+      LogicalJoin join, TranslatorContext context) {
+    MessageStream<SamzaSqlRelMessage> inputTable =
+        isTablePosOnRight ?
+            context.getMessageStream(join.getRight().getId()) : context.getMessageStream(join.getLeft().getId());
+
+    // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational
+    // message as the value. Send the messages from the input stream denoted as 'table' to the created table store.
+    Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
+        context.getStreamGraph()
+            .getTable(new RocksDbTableDescriptor("table_" + joinId)
+                .withSerde(KVSerde.of(keySerde, relMsgSerde)));
+
+    inputTable
+        .map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m))
+        .sendTo(table);
+
+    return table;
+  }
+
+  private void logStringAndTableJoinKeys(List<String> fieldNames, List<Integer> fieldIds) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index f5cc525..0f31fb6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -42,11 +42,11 @@ import org.slf4j.LoggerFactory;
  * Translator to translate the Project node in the relational graph to the corresponding StreamGraph
  * implementation.
  */
-public class ProjectTranslator {
+class ProjectTranslator {
 
   private static final Logger LOG = LoggerFactory.getLogger(ProjectTranslator.class);
 
-  public void translate(final Project project, final TranslatorContext context) {
+  void translate(final Project project, final TranslatorContext context) {
     MessageStream<SamzaSqlRelMessage> messageStream = context.getMessageStream(project.getInput().getId());
     List<Integer> flattenProjects =
         project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList());

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 87e37f4..b853537 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
@@ -32,7 +33,8 @@ import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
@@ -62,6 +64,7 @@ public class QueryTranslator {
     final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
     final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext);
     final RelNode node = relRoot.project();
+    final int[] joinId = new int[1];
 
     node.accept(new RelShuttleImpl() {
       @Override
@@ -84,9 +87,18 @@ public class QueryTranslator {
         new ProjectTranslator().translate(project, context);
         return node;
       }
+
+      @Override
+      public RelNode visit(LogicalJoin join) {
+        RelNode node = super.visit(join);
+        joinId[0]++;
+        SourceResolver sourceResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getSourceResolver();
+        new JoinTranslator(joinId[0], sourceResolver).translate(join, context);
+        return node;
+      }
     });
 
-    SqlSystemStreamConfig outputSystemConfig =
+    SqlSystemSourceConfig outputSystemConfig =
         sqlConfig.getOutputSystemStreamConfigsBySource().get(queryInfo.getOutputSource());
     SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(queryInfo.getOutputSource());
     MessageStreamImpl<SamzaSqlRelMessage> stream =

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
new file mode 100644
index 0000000..69e4e09
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
@@ -0,0 +1,121 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*;
+
+
+/**
+ * This class joins incoming {@link SamzaSqlRelMessage} from a stream with the records in a table with the join key
+ * being {@link SamzaSqlCompositeKey}
+ */
+public class SamzaSqlRelMessageJoinFunction
+    implements StreamTableJoinFunction<SamzaSqlCompositeKey, SamzaSqlRelMessage, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>, SamzaSqlRelMessage> {
+
+  private static final Logger log = LoggerFactory.getLogger(SamzaSqlRelMessageJoinFunction.class);
+
+  private final JoinRelType joinRelType;
+  private final boolean isTablePosOnRight;
+  private final List<Integer> streamFieldIds;
+  // Table field names are used in the outer join when the table record is not found.
+  private final List<String> tableFieldNames;
+  private final List<String> outFieldNames;
+
+  public SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean isTablePosOnRight,
+      List<Integer> streamFieldIds, List<String> streamFieldNames, List<String> tableFieldNames) {
+    this.joinRelType = joinRelType;
+    this.isTablePosOnRight = isTablePosOnRight;
+    Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) ||
+        (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) ||
+        joinRelType.compareTo(JoinRelType.INNER) == 0);
+    this.streamFieldIds = streamFieldIds;
+    this.tableFieldNames = tableFieldNames;
+    this.outFieldNames = new ArrayList<>();
+    if (isTablePosOnRight) {
+      outFieldNames.addAll(streamFieldNames);
+    }
+    outFieldNames.addAll(tableFieldNames);
+    if (!isTablePosOnRight) {
+      outFieldNames.addAll(streamFieldNames);
+    }
+  }
+
+  @Override
+  public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) {
+
+    if (joinRelType.compareTo(JoinRelType.INNER) == 0 && record == null) {
+      log.debug("Inner Join: Record not found for the message with key: " + getMessageKey(message));
+      // Returning null would result in Join operator implementation to filter out the message.
+      return null;
+    }
+
+    // The resulting join output should be a SamzaSqlRelMessage containing the fields from both the stream message and
+    // table record. The order of stream message fields and table record fields are dictated by the position of stream
+    // and table in the 'from' clause of sql query. The output should also include the keys from both the stream message
+    // and the table record.
+    List<Object> outFieldValues = new ArrayList<>();
+
+    // If table position is on the right, add the stream message fields first
+    if (isTablePosOnRight) {
+      outFieldValues.addAll(message.getFieldValues());
+    }
+
+    // Add the table record fields.
+    if (record != null) {
+      outFieldValues.addAll(record.getValue().getFieldValues());
+    } else {
+      // Table record could be null as the record could not be found in the store. This can
+      // happen for outer joins. Add nulls to all the field values in the output message.
+      tableFieldNames.forEach(s -> outFieldValues.add(null));
+    }
+
+    // If table position is on the left, add the stream message fields last
+    if (!isTablePosOnRight) {
+      outFieldValues.addAll(message.getFieldValues());
+    }
+
+    return new SamzaSqlRelMessage(outFieldNames, outFieldValues);
+  }
+
+  @Override
+  public SamzaSqlCompositeKey getMessageKey(SamzaSqlRelMessage message) {
+    return createSamzaSqlCompositeKey(message, streamFieldIds);
+  }
+
+  @Override
+  public SamzaSqlCompositeKey getRecordKey(KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) {
+    return record.getKey();
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 30e5a9b..13300f7 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -28,27 +28,27 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 
 
 /**
  * Translator to translate the TableScans in relational graph to the corresponding input streams in the StreamGraph
  * implementation
  */
-public class ScanTranslator {
+class ScanTranslator {
 
   private final Map<String, SamzaRelConverter> relMsgConverters;
-  private final Map<String, SqlSystemStreamConfig> systemStreamConfig;
+  private final Map<String, SqlSystemSourceConfig> systemStreamConfig;
 
-  public ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlSystemStreamConfig> ssc) {
+  ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlSystemSourceConfig> ssc) {
     relMsgConverters = converters;
     this.systemStreamConfig = ssc;
   }
 
-  public void translate(final TableScan tableScan, final TranslatorContext context) {
+  void translate(final TableScan tableScan, final TranslatorContext context) {
     StreamGraph streamGraph = context.getStreamGraph();
     List<String> tableNameParts = tableScan.getTable().getQualifiedName();
-    String sourceName = SqlSystemStreamConfig.getSourceFromSourceParts(tableNameParts);
+    String sourceName = SqlSystemSourceConfig.getSourceFromSourceParts(tableNameParts);
 
     Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName));
     SamzaRelConverter converter = relMsgConverters.get(sourceName);