You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/16 19:15:00 UTC

[1/3] samza git commit: Samza SQL implementation for basic projects, filtering and UDFs

Repository: samza
Updated Branches:
  refs/heads/master 2e2e00ed0 -> 9fa8beed7


http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
new file mode 100644
index 0000000..0b85339
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.Assert;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.junit.Test;
+
+
+public class TestSamzaSqlApplicationConfig {
+
+  @Test
+  public void testConfigInit() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1");
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    Assert.assertEquals(1, samzaSqlApplicationConfig.getQueryInfo().size());
+    Assert.assertEquals(3, samzaSqlApplicationConfig.getUdfMetadata().size());
+    Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size());
+    Assert.assertEquals(1, samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size());
+  }
+
+  @Test
+  public void testWrongConfigs() {
+
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+
+
+    try {
+      // Fail because no SQL config
+      new SamzaSqlApplicationConfig(new MapConfig(config));
+      Assert.fail();
+    } catch (SamzaException e) {
+    }
+
+    // Pass
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1");
+    new SamzaSqlApplicationConfig(new MapConfig(config));
+    testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER);
+    testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_UDF_RESOLVER);
+
+    String configSourceResolverDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
+    String avroSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", "testavro");
+
+    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER);
+
+    // Configs for the unused system "log" is not mandatory.
+    String logSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", "log");
+    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER);
+  }
+
+  private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) {
+    Map<String, String> badConfigs = new HashMap<>(config);
+    badConfigs.remove(configKey);
+    new SamzaSqlApplicationConfig(new MapConfig(badConfigs));
+  }
+
+  private void testWithoutConfigShouldFail(Map<String, String> config, String configKey) {
+    Map<String, String> badConfigs = new HashMap<>(config);
+    badConfigs.remove(configKey);
+    try {
+      new SamzaSqlApplicationConfig(new MapConfig(badConfigs));
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      // swallow
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java
new file mode 100644
index 0000000..5bac472
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java
@@ -0,0 +1,58 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import org.apache.samza.sql.testutil.SqlFileParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlFileParser {
+
+  public static final String TEST_SQL =
+      "insert into log.outputStream \n" + "\tselect * from brooklin.elasticsearchEnterpriseAccounts\n"
+          + "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, "
+          + "description as name3 from brooklin.elasticsearchEnterpriseAccounts\n" + "--insert into log.outputstream \n"
+          + "insert into log.outputstream \n" + "\n" + "\tselect id, MyTest(id) as id2 \n" + "\n"
+          + "\tfrom tracking.SamzaSqlTestTopic1_p8";
+
+  @Test
+  public void testParseSqlFile() throws IOException {
+    File tempFile = File.createTempFile("testparser", "");
+    PrintWriter fileWriter = new PrintWriter(tempFile.getCanonicalPath());
+    fileWriter.println(TEST_SQL);
+    fileWriter.close();
+
+    List<String> sqlStmts = SqlFileParser.parseSqlFile(tempFile.getAbsolutePath());
+    Assert.assertEquals(3, sqlStmts.size());
+    Assert.assertEquals("insert into log.outputStream select * from brooklin.elasticsearchEnterpriseAccounts",
+        sqlStmts.get(0));
+    Assert.assertEquals(
+        "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, description as name3 from brooklin.elasticsearchEnterpriseAccounts",
+        sqlStmts.get(1));
+    Assert.assertEquals("insert into log.outputstream select id, MyTest(id) as id2 from tracking.SamzaSqlTestTopic1_p8",
+        sqlStmts.get(2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
new file mode 100644
index 0000000..97196e2
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
@@ -0,0 +1,70 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class TestSamzaSqlQueryParser {
+
+  @Test
+  public void testParseQuery() {
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar");
+    Assert.assertEquals("log.foo", queryInfo.getOutputSource());
+    Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery());
+    Assert.assertEquals(1, queryInfo.getInputSources().size());
+    Assert.assertEquals("tracking.bar", queryInfo.getInputSources().get(0));
+  }
+
+  @Test
+  public void testParseInvalidQuery() {
+
+    try {
+      SamzaSqlQueryParser.parseQuery("select * from tracking.bar");
+      Assert.fail("Expected a samzaException");
+    } catch (SamzaException e) {
+    }
+
+    try {
+      SamzaSqlQueryParser.parseQuery("insert into select * from tracking.bar");
+      Assert.fail("Expected a samzaException");
+    } catch (SamzaException e) {
+    }
+
+    try {
+      SamzaSqlQueryParser.parseQuery("insert into log.off select from tracking.bar");
+      Assert.fail("Expected a samzaException");
+    } catch (SamzaException e) {
+    }
+  }
+
+  @Test
+  public void testParseJoin() {
+    try {
+      SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar1,tracking.bar2");
+      Assert.fail("Expected a samzaException");
+    } catch (SamzaException e) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
new file mode 100644
index 0000000..3290b96
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
@@ -0,0 +1,46 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRelMessage {
+
+  private List<Object> values = Arrays.asList("value1", "value2");
+  private List<String> names = Arrays.asList("field1", "field2");
+
+  @Test
+  public void testGetField() {
+    SamzaSqlRelMessage message = SamzaSqlRelMessage.createRelMessage(values, names);
+    Assert.assertEquals(values.get(0), message.getField(names.get(0)).get());
+    Assert.assertEquals(values.get(1), message.getField(names.get(1)).get());
+  }
+
+  @Test
+  public void testGetNonExistentField() {
+    SamzaSqlRelMessage message = SamzaSqlRelMessage.createRelMessage(values, names);
+    Assert.assertFalse(message.getField("field3").isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
new file mode 100644
index 0000000..a54db54
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -0,0 +1,239 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import com.google.common.base.Joiner;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.SimpleRecord;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.system.SystemStream;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestAvroRelConversion {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestAvroRelConversion.class);
+  private final AvroRelConverter simpleRecordAvroRelConverter;
+  private final AvroRelConverter complexRecordAvroRelConverter;
+  private final AvroRelSchemaProvider simpleRecordSchemaProvider;
+  private final AvroRelSchemaProvider complexRecordSchemProvider;
+
+  private int id = 1;
+  private boolean boolValue = true;
+  private double doubleValue = 0.6;
+  private float floatValue = 0.6f;
+  private String testStrValue = "testString";
+  private ByteBuffer testBytes = ByteBuffer.wrap("testBytes".getBytes());
+  private long longValue = 200L;
+
+  private HashMap<String, String> mapValue = new HashMap<String, String>() {{
+    put("key1", "val1");
+    put("key2", "val2");
+    put("key3", "val3");
+  }};
+  private List<String> arrayValue = Arrays.asList("val1", "val2", "val3");
+
+  public TestAvroRelConversion() {
+    Map<String, String> props = new HashMap<>();
+    SystemStream ss1 = new SystemStream("test", "complexRecord");
+    SystemStream ss2 = new SystemStream("test", "simpleRecord");
+    props.put(
+        String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()),
+        ComplexRecord.SCHEMA$.toString());
+    props.put(
+        String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss2.getSystem(), ss2.getStream()),
+        SimpleRecord.SCHEMA$.toString());
+
+    ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory();
+
+    complexRecordSchemProvider = (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props));
+    simpleRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss2, new MapConfig(props));
+    complexRecordAvroRelConverter = new AvroRelConverter(ss1, complexRecordSchemProvider, new MapConfig());
+    simpleRecordAvroRelConverter = new AvroRelConverter(ss2, simpleRecordSchemaProvider, new MapConfig());
+  }
+
+  @Test
+  public void testSimpleSchemaConversion() {
+    String streamName = "stream";
+
+    RelDataType dataType = simpleRecordSchemaProvider.getRelationalSchema();
+    junit.framework.Assert.assertTrue(dataType instanceof RelRecordType);
+    RelRecordType recordType = (RelRecordType) dataType;
+
+    junit.framework.Assert.assertEquals(recordType.getFieldCount(), SimpleRecord.SCHEMA$.getFields().size());
+    junit.framework.Assert.assertTrue(
+        recordType.getField("id", true, false).getType().getSqlTypeName() == SqlTypeName.INTEGER);
+    junit.framework.Assert.assertTrue(
+        recordType.getField("name", true, false).getType().getSqlTypeName() == SqlTypeName.VARCHAR);
+
+    LOG.info("Relational schema " + dataType);
+  }
+
+  @Test
+  public void testComplexSchemaConversion() {
+    RelDataType relSchema = complexRecordSchemProvider.getRelationalSchema();
+
+    LOG.info("Relational schema " + relSchema);
+  }
+
+  @Test
+  public void testSimpleRecordConversion() {
+
+    GenericData.Record record = new GenericData.Record(SimpleRecord.SCHEMA$);
+    record.put("id", 1);
+    record.put("name", "name1");
+
+    SamzaSqlRelMessage message = simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record));
+    LOG.info(Joiner.on(",").join(message.getFieldValues()));
+    LOG.info(Joiner.on(",").join(message.getFieldNames()));
+  }
+
+  public static <T> byte[] encodeAvroSpecificRecord(Class<T> clazz, T record) throws IOException {
+    DatumWriter<T> msgDatumWriter = new SpecificDatumWriter<>(clazz);
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+
+    Encoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+    msgDatumWriter.write(record, encoder);
+    encoder.flush();
+    return os.toByteArray();
+  }
+
+  @Test
+  public void testComplexRecordConversion() throws IOException {
+    GenericData.Record record = new GenericData.Record(ComplexRecord.SCHEMA$);
+    record.put("id", id);
+    record.put("bool_value", boolValue);
+    record.put("double_value", doubleValue);
+    record.put("float_value", floatValue);
+    record.put("string_value", testStrValue);
+    record.put("bytes_value", testBytes);
+    record.put("long_value", longValue);
+    record.put("array_values", arrayValue);
+    record.put("map_values", mapValue);
+
+    ComplexRecord complexRecord = new ComplexRecord();
+    complexRecord.id = id;
+    complexRecord.bool_value = boolValue;
+    complexRecord.double_value = doubleValue;
+    complexRecord.float_value = floatValue;
+    complexRecord.string_value = testStrValue;
+    complexRecord.bytes_value = testBytes;
+    complexRecord.long_value = longValue;
+    complexRecord.array_values = new ArrayList<>();
+    complexRecord.array_values.addAll(arrayValue);
+    complexRecord.map_values = new HashMap<>();
+    complexRecord.map_values.putAll(mapValue);
+
+    byte[] serializedData = bytesFromGenericRecord(record);
+    validateAvroSerializedData(serializedData);
+
+    serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
+    validateAvroSerializedData(serializedData);
+  }
+
+  private static <T> T genericRecordFromBytes(byte[] bytes, Schema schema) throws IOException {
+    BinaryDecoder binDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
+    GenericDatumReader<T> reader = new GenericDatumReader<>(schema);
+    return reader.read(null, binDecoder);
+  }
+
+  private static byte[] bytesFromGenericRecord(GenericRecord record) throws IOException {
+    DatumWriter<IndexedRecord> datumWriter;
+    datumWriter = new GenericDatumWriter<>(record.getSchema());
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+    datumWriter.write(record, encoder);
+    encoder.flush();
+    outputStream.close();
+    return outputStream.toByteArray();
+  }
+
+  private void validateAvroSerializedData(byte[] serializedData) throws IOException {
+    GenericRecord complexRecordValue = genericRecordFromBytes(serializedData, ComplexRecord.SCHEMA$);
+
+    String streamName = "stream";
+    RelDataType dataType = complexRecordSchemProvider.getRelationalSchema();
+
+    SamzaSqlRelMessage message = complexRecordAvroRelConverter.convertToRelMessage(new KV<>("key", complexRecordValue));
+    Assert.assertEquals(message.getFieldNames().size(),
+        ComplexRecord.SCHEMA$.getFields().size());
+
+    Assert.assertEquals(message.getField("id").get(), id);
+    Assert.assertEquals(message.getField("bool_value").get(), boolValue);
+    Assert.assertEquals(message.getField("double_value").get(), doubleValue);
+    Assert.assertEquals(message.getField("string_value").get(), new Utf8(testStrValue));
+    Assert.assertEquals(message.getField("float_value").get(), floatValue);
+    Assert.assertEquals(message.getField("long_value").get(), longValue);
+    Assert.assertTrue(
+        arrayValue.stream().map(Utf8::new).collect(Collectors.toList()).equals(message.getField("array_values").get()));
+    Assert.assertTrue(mapValue.entrySet()
+        .stream()
+        .collect(Collectors.toMap(x -> new Utf8(x.getKey()), y -> new Utf8(y.getValue())))
+        .equals(message.getField("map_values").get()));
+
+    Assert.assertTrue(message.getField("bytes_value").get().equals(testBytes));
+
+    LOG.info(Joiner.on(",").useForNull("null").join(message.getFieldValues()));
+    LOG.info(Joiner.on(",").join(message.getFieldNames()));
+
+    KV<Object, Object> samzaMessage = complexRecordAvroRelConverter.convertToSamzaMessage(message);
+    GenericRecord record = (GenericRecord) samzaMessage.getValue();
+
+    for (Schema.Field field : ComplexRecord.SCHEMA$.getFields()) {
+      if (field.name().equals("array_values")) {
+        Assert.assertTrue(record.get(field.name()).equals(complexRecordValue.get(field.name())));
+      } else {
+        Assert.assertEquals(record.get(field.name()), complexRecordValue.get(field.name()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
new file mode 100644
index 0000000..30794fb
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
@@ -0,0 +1,143 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "ComplexRecord",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "id",
+            "doc": "Record id.",
+            "type": ["null", "int"],
+            "default":null
+        },
+        {
+            "name": "bool_value",
+            "doc": "Boolean Value.",
+            "type": ["null", "boolean"],
+            "default":null
+        },
+        {
+            "name": "double_value",
+            "doc": "double Value.",
+            "type": ["null", "double"],
+            "default":null
+        },
+        {
+            "name": "float_value",
+            "doc": "float Value.",
+            "type": ["null", "float"],
+            "default":null
+        },
+        {
+            "name": "string_value",
+            "doc": "string Value.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "bytes_value",
+            "doc": "bytes Value.",
+            "type": ["null", "bytes"],
+            "default":null
+        },
+        {
+            "name": "long_value",
+            "doc": "long Value.",
+            "type": ["null", "long"],
+            "default":null
+        },
+        {
+            "name": "fixed_value",
+            "doc": "fixed Value.",
+            "type": ["null", {
+               "name": "MyFixed",
+               "type":"fixed",
+               "size":16
+            }
+          ]
+        },
+        {
+            "name": "array_values",
+            "doc" : "array values in the record.",
+            "default": [],
+            "type": [ "null",
+              {
+                "type": "array",
+                "items": "string"
+              }
+            ]
+        },
+        {
+            "name": "map_values",
+            "doc" : "map values in the record.",
+            "default": [],
+            "type": [ "null",
+              {
+                "type": "map",
+                "values": "string"
+              }
+            ]
+        },
+        {
+            "name": "enum_value",
+            "doc" : "enum value.",
+            "default": [],
+            "type": [ "null",
+            {
+               "name": "TestEnumType",
+               "namespace": "org.apache.samza.sql.system.avro",
+               "type": "enum",
+               "doc": "My sample enum type",
+               "symbols": ["foo", "bar"]
+            }
+          ]
+        },
+        {
+            "name": "array_records",
+            "doc" : "array of records.",
+            "default": [],
+            "type": [ "null",
+              {
+                "type": "record",
+                "name": "SubRecord",
+                "namespace": "org.apache.samza.sql.system.avro",
+                "doc": "Sub record",
+                "fields": [
+                  {
+                    "name": "id",
+                    "doc": "sub record id",
+                    "type": ["null", "int"]
+                  },
+                  {
+                    "name": "sub_values",
+                    "doc": "Sub record ",
+                    "type": {
+                      "type": "array",
+                      "items": "string"
+                    }
+                  }
+                ]
+              }
+            ]
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
new file mode 100644
index 0000000..dfba35e
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"float\"],\"doc\":\"float Value.\",\"default\":null},{\"name\":\"string_value\",\"type\":[\"null\",\"string\"],\"doc\":\"string Value.\",\"default\":null},{\"name\":\"bytes_value\",\"type\":[\"null\",\"bytes\"],\"doc\":\"bytes Value.\",\"default\":null},{\"name\":\"long_value\",\"type\":[\"null\",\"long\"],\"doc\":\"long Value.\",\"default\":null},{\"name\":\"fixed_value\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"MyFixed\",\"size\":16}],\"doc\":\"fixed
  Value.\"},{\"name\":\"array_values\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"array values in the record.\",\"default\":[]},{\"name\":\"map_values\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"map values in the record.\",\"default\":[]},{\"name\":\"enum_value\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"TestEnumType\",\"symbols\":[\"foo\",\"bar\"]}],\"doc\":\"enum value.\",\"default\":[]},{\"name\":\"array_records\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"sub record id\"},{\"name\":\"sub_values\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"doc\":\"Sub record \"}]}],\"doc\":\"array of records.\",\"default\":[]}]}");
+  /** Record id. */
+  public java.lang.Integer id;
+  /** Boolean Value. */
+  public java.lang.Boolean bool_value;
+  /** double Value. */
+  public java.lang.Double double_value;
+  /** float Value. */
+  public java.lang.Float float_value;
+  /** string Value. */
+  public java.lang.CharSequence string_value;
+  /** bytes Value. */
+  public java.nio.ByteBuffer bytes_value;
+  /** long Value. */
+  public java.lang.Long long_value;
+  /** fixed Value. */
+  public MyFixed fixed_value;
+  /** array values in the record. */
+  public java.util.List<java.lang.CharSequence> array_values;
+  /** map values in the record. */
+  public java.util.Map<java.lang.CharSequence,java.lang.CharSequence> map_values;
+  /** enum value. */
+  public TestEnumType enum_value;
+  /** array of records. */
+  public SubRecord array_records;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return id;
+    case 1: return bool_value;
+    case 2: return double_value;
+    case 3: return float_value;
+    case 4: return string_value;
+    case 5: return bytes_value;
+    case 6: return long_value;
+    case 7: return fixed_value;
+    case 8: return array_values;
+    case 9: return map_values;
+    case 10: return enum_value;
+    case 11: return array_records;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: id = (java.lang.Integer)value$; break;
+    case 1: bool_value = (java.lang.Boolean)value$; break;
+    case 2: double_value = (java.lang.Double)value$; break;
+    case 3: float_value = (java.lang.Float)value$; break;
+    case 4: string_value = (java.lang.CharSequence)value$; break;
+    case 5: bytes_value = (java.nio.ByteBuffer)value$; break;
+    case 6: long_value = (java.lang.Long)value$; break;
+    case 7: fixed_value = (MyFixed)value$; break;
+    case 8: array_values = (java.util.List<java.lang.CharSequence>)value$; break;
+    case 9: map_values = (java.util.Map<java.lang.CharSequence,java.lang.CharSequence>)value$; break;
+    case 10: enum_value = (TestEnumType)value$; break;
+    case 11: array_records = (SubRecord)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java
new file mode 100644
index 0000000..1405c84
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java
@@ -0,0 +1,29 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+@org.apache.avro.specific.FixedSize(16)
+public class MyFixed extends org.apache.avro.specific.SpecificFixed {}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc
new file mode 100644
index 0000000..6b010a4
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "SimpleRecord",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "id",
+            "doc": "Record id.",
+            "type": ["null", "int"],
+            "default":null
+        },
+        {
+            "name": "name",
+            "doc" : "Some name.",
+            "type": ["null", "string"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java
new file mode 100644
index 0000000..baf3812
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java
@@ -0,0 +1,52 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class SimpleRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"SimpleRecord\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Some name.\",\"default\":null}]}");
+  /** Record id. */
+  public java.lang.Integer id;
+  /** Some name. */
+  public java.lang.CharSequence name;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return id;
+    case 1: return name;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: id = (java.lang.Integer)value$; break;
+    case 1: name = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SubRecord.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SubRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SubRecord.java
new file mode 100644
index 0000000..cd84b29
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SubRecord.java
@@ -0,0 +1,53 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+/** Sub record */
+public class SubRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"SubRecord\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"sub record id\"},{\"name\":\"sub_values\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"doc\":\"Sub record \"}]}");
+  /** sub record id */
+  public java.lang.Integer id;
+  /** Sub record  */
+  public java.util.List<java.lang.CharSequence> sub_values;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return id;
+    case 1: return sub_values;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: id = (java.lang.Integer)value$; break;
+    case 1: sub_values = (java.util.List<java.lang.CharSequence>)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/TestEnumType.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/TestEnumType.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/TestEnumType.java
new file mode 100644
index 0000000..79d7bf6
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/TestEnumType.java
@@ -0,0 +1,31 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+/** My sample enum type */
+public enum TestEnumType {
+  foo, bar
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java
new file mode 100644
index 0000000..769c653
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java
@@ -0,0 +1,137 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.e2e;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.system.TestAvroSystemFactory;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.MyTestUdf;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestSamzaSqlEndToEnd {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class);
+
+  @Test
+  public void testEndToEnd() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    String sql1 = "Insert into testavro.outputTopic select id, CURRENT_TIME as long_value from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
+  }
+
+  @Test
+  public void testEndToEndFlatten() throws Exception {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
+    String sql1 =
+        "Insert into testavro.outputTopic select Flatten(array_values) as string_value, id from testavro.COMPLEX1";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
+
+    int expectedMessages = 0;
+    // Flatten de-normalizes the data. So there is separate record for each entry in the array.
+    for (int index = 1; index < numMessages; index++) {
+      expectedMessages = expectedMessages + Math.max(1, index);
+    }
+    Assert.assertEquals(expectedMessages, outMessages.size());
+  }
+
+  @Test
+  public void testEndToEndSubQuery() throws Exception {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    String sql1 =
+        "Insert into testavro.outputTopic select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
+
+    int expectedMessages = 0;
+    // Flatten de-normalizes the data. So there is separate record for each entry in the array.
+    for (int index = 1; index < numMessages; index++) {
+      expectedMessages = expectedMessages + Math.max(1, index);
+    }
+    Assert.assertEquals(expectedMessages, outMessages.size());
+  }
+
+  @Test
+  public void testEndToEndUdf() throws Exception {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    LOG.info("output Messages " + TestAvroSystemFactory.messages);
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("long_value").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(outMessages.size(), numMessages);
+    MyTestUdf udf = new MyTestUdf();
+
+    Assert.assertTrue(
+        IntStream.range(0, numMessages).map(udf::execute).boxed().collect(Collectors.toList()).equals(outMessages));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
new file mode 100644
index 0000000..aa5ac1b
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
@@ -0,0 +1,83 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.system;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Console logging System factory that just writes the messages to the console output.
+ * This system factory is useful when the user wants to print the output of the stream processing to console.
+ */
+public class ConsoleLoggingSystemFactory implements SystemFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConsoleLoggingSystemFactory.class);
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    return new LoggingSystemProducer();
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new SimpleSystemAdmin(config);
+  }
+
+  private class LoggingSystemProducer implements SystemProducer {
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public void register(String source) {
+      LOG.info("Registering source" + source);
+    }
+
+    @Override
+    public void send(String source, OutgoingMessageEnvelope envelope) {
+      String msg = String.format("OutputStream:%s Key:%s Value:%s", envelope.getSystemStream(), envelope.getKey(),
+          envelope.getMessage());
+      LOG.info(msg);
+      System.out.println(msg);
+    }
+
+    @Override
+    public void flush(String source) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java b/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
new file mode 100644
index 0000000..5655a81
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.system;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+public class SimpleSystemAdmin implements SystemAdmin {
+
+  public SimpleSystemAdmin(Config config) {
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    return offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, null));
+  }
+
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+    return streamNames.stream()
+        .collect(Collectors.toMap(Function.identity(), streamName -> new SystemStreamMetadata(streamName,
+            Collections.singletonMap(new Partition(0),
+                new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null)))));
+  }
+
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    if (offset1 == null) {
+      return offset2 == null ? 0 : -1;
+    } else if (offset2 == null) {
+      return 1;
+    }
+    return offset1.compareTo(offset2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
new file mode 100644
index 0000000..3a9ae16
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -0,0 +1,156 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.system;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.SimpleRecord;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestAvroSystemFactory implements SystemFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(TestAvroSystemFactory.class);
+  public static final String CFG_NUM_MESSAGES = "numMessages";
+  public static List<OutgoingMessageEnvelope> messages = new ArrayList<>();
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+    return new TestAvroSystemConsumer(systemName, config);
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    return new TestAvroSystemProducer();
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new SimpleSystemAdmin(config);
+  }
+
+  private class TestAvroSystemConsumer implements SystemConsumer {
+    public static final int DEFAULT_NUM_EVENTS = 10;
+    private final int numMessages;
+    private boolean simpleRecord;
+
+    public TestAvroSystemConsumer(String systemName, Config config) {
+      numMessages = config.getInt(String.format("systems.%s.%s", systemName, CFG_NUM_MESSAGES), DEFAULT_NUM_EVENTS);
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public void register(SystemStreamPartition systemStreamPartition, String offset) {
+      simpleRecord = systemStreamPartition.getStream().toLowerCase().contains("simple");
+    }
+
+    @Override
+    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long timeout)
+        throws InterruptedException {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = new HashMap<>();
+      set.forEach(ssp -> {
+        // We send num Messages and an end of stream message following that.
+        List<IncomingMessageEnvelope> envelopes = IntStream.range(0, numMessages + 1)
+            .mapToObj(i -> new IncomingMessageEnvelope(ssp,
+                i == numMessages ? IncomingMessageEnvelope.END_OF_STREAM_OFFSET : null, "key" + i, getData(i)))
+            .collect(Collectors.toList());
+        envelopeMap.put(ssp, envelopes);
+      });
+
+      return envelopeMap;
+    }
+
+    private Object getData(int index) {
+      if (simpleRecord) {
+        return createSimpleRecord(index);
+      } else {
+        return createComplexRecord(index);
+      }
+    }
+
+    private Object createSimpleRecord(int index) {
+      GenericRecord record = new GenericData.Record(SimpleRecord.SCHEMA$);
+      record.put("id", index);
+      record.put("name", "Name" + index);
+      return record;
+    }
+
+    private Object createComplexRecord(int index) {
+      GenericRecord record = new GenericData.Record(ComplexRecord.SCHEMA$);
+      record.put("id", index);
+      record.put("string_value", "Name" + index);
+      GenericData.Array<String> arrayValues =
+          new GenericData.Array<>(index, ComplexRecord.SCHEMA$.getField("array_values").schema().getTypes().get(1));
+      arrayValues.addAll(IntStream.range(0, index).mapToObj(String::valueOf).collect(Collectors.toList()));
+      record.put("array_values", arrayValues);
+      return record;
+    }
+  }
+
+  private class TestAvroSystemProducer implements SystemProducer {
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public void register(String source) {
+    }
+
+    @Override
+    public void send(String source, OutgoingMessageEnvelope envelope) {
+      LOG.info("Adding message " + envelope);
+      messages.add(envelope);
+    }
+
+    @Override
+    public void flush(String source) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java
new file mode 100644
index 0000000..c9bc9bc
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+public class MyTestArrayUdf implements ScalarUdf {
+  @Override
+  public void init(Config udfConfig) {
+  }
+
+  public Object execute(Object... args) {
+    Integer value = (Integer) args[0];
+    return IntStream.range(0, value).mapToObj(String::valueOf).collect(Collectors.toList());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java
new file mode 100644
index 0000000..baca367
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.ScalarUdf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test UDF used by unit and integration tests.
+ */
+public class MyTestUdf implements ScalarUdf {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MyTestUdf.class);
+
+  public Integer execute(Object... value) {
+    return ((Integer) value[0]) * 2;
+  }
+
+  @Override
+  public void init(Config udfConfig) {
+    LOG.info("Init called with {}", udfConfig);
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
new file mode 100644
index 0000000..760d5d0
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -0,0 +1,103 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import com.google.common.base.Joiner;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.sql.avro.AvroRelConverterFactory;
+import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.SimpleRecord;
+import org.apache.samza.sql.fn.FlattenUdf;
+import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.system.TestAvroSystemFactory;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+
+
+/**
+ * Utility to hookup the configs needed to run the Samza Sql application.
+ */
+public class SamzaSqlTestConfig {
+
+  public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
+
+  public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
+    HashMap<String, String> staticConfigs = new HashMap<>();
+
+    staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
+    staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
+    staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER, "config");
+    String configSourceResolverDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
+    staticConfigs.put(configSourceResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedSourceResolverFactory.class.getName());
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
+    String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
+    staticConfigs.put(configUdfResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedUdfResolver.class.getName());
+    staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES,
+        Joiner.on(",").join(MyTestUdf.class.getName(), FlattenUdf.class.getName(), MyTestArrayUdf.class.getName()));
+
+    String avroSystemConfigPrefix =
+        String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO);
+    String avroSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO);
+    staticConfigs.put(avroSystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
+    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
+        String.valueOf(numberOfMessages));
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String avroSamzaToRelMsgConverterDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
+    staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        AvroRelConverterFactory.class.getName());
+
+    String configAvroRelSchemaProviderDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config");
+    staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedAvroRelSchemaProviderFactory.class.getName());
+
+    staticConfigs.put(
+        configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+            "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(
+        configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+            "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
+
+    staticConfigs.put(
+        configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+            "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString());
+
+    return staticConfigs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/resources/log4j.xml b/samza-sql/src/test/resources/log4j.xml
new file mode 100644
index 0000000..6259b48
--- /dev/null
+++ b/samza-sql/src/test/resources/log4j.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  you under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+  @log4j.appenders.webapp@
+
+  @log4j.appenders.public_access@
+
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern"
+             value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+    </layout>
+  </appender>
+
+  @log4j.loggers.spring@
+
+  @log4j.loggers.public_access@
+  <logger name="org.apache" additivity="false">
+    <level value="DEBUG"/>
+    <appender-ref ref="console"/>
+  </logger>
+
+  @log4j.loggers.root@
+  <root>
+    <priority value ="DEBUG" />
+    <appender-ref ref="console"/>
+  </root>
+
+
+</log4j:configuration>
+

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 4c81d9c..0fe3dfa 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -23,7 +23,10 @@ include \
   'samza-log4j',
   'samza-rest',
   'samza-shell',
-  'samza-azure'
+  'samza-azure',
+  'samza-sql'
+
+
 
 def scalaModules = [
         'samza-core',


[2/3] samza git commit: Samza SQL implementation for basic projects, filtering and UDFs

Posted by xi...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
new file mode 100644
index 0000000..ce03ba3
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -0,0 +1,153 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.externalize.RelJsonWriter;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.UdfMetadata;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * QueryPlanner that uses calcite engine to convert the sql query to relational graph.
+ */
+public class QueryPlanner {
+  private static final Logger LOG = LoggerFactory.getLogger(QueryPlanner.class);
+
+  private final Map<String, SqlSystemStreamConfig> systemStreamConfigBySource;
+  private final Collection<UdfMetadata> udfMetadata;
+  private final Map<SystemStream, RelSchemaProvider> relSchemaProviders;
+
+  public QueryPlanner(Map<SystemStream, RelSchemaProvider> relSchemaProviders,
+      Map<String, SqlSystemStreamConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
+    this.relSchemaProviders = relSchemaProviders;
+    this.systemStreamConfigBySource = systemStreamConfigBySource;
+    this.udfMetadata = udfMetadata;
+  }
+
+  public RelRoot plan(String query) {
+    try {
+      Connection connection = DriverManager.getConnection("jdbc:calcite:");
+      CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+      SchemaPlus rootSchema = calciteConnection.getRootSchema();
+      Map<String, SchemaPlus> systemSchemas = new HashMap<>();
+
+      for (SqlSystemStreamConfig ssc : systemStreamConfigBySource.values()) {
+
+        RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSystemStream());
+        SchemaPlus systemSchema =
+            systemSchemas.computeIfAbsent(ssc.getSystemName(), s -> rootSchema.add(s, new AbstractSchema()));
+        RelDataType relationalSchema = relSchemaProvider.getRelationalSchema();
+
+        systemSchema.add(ssc.getStreamName(), createTableFromRelSchema(relationalSchema));
+      }
+
+      List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
+          .map(x -> new SamzaSqlScalarFunctionImpl(x.getName(), x.getUdfMethod()))
+          .collect(Collectors.toList());
+
+      final List<RelTraitDef> traitDefs = new ArrayList<>();
+
+      traitDefs.add(ConventionTraitDef.INSTANCE);
+      traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+      sqlOperatorTables.add(new SamzaSqlOperatorTable());
+      sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
+
+      FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
+          .parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).build())
+          .defaultSchema(rootSchema)
+          .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+          .traitDefs(traitDefs)
+          .context(Contexts.EMPTY_CONTEXT)
+          .costFactory(null)
+          .build();
+      Planner planner = Frameworks.getPlanner(frameworkConfig);
+
+      SqlNode sql = planner.parse(query);
+      SqlNode validatedSql = planner.validate(sql);
+      RelRoot relRoot = planner.rel(validatedSql);
+      LOG.info("query plan:\n" + sql.toString());
+      LOG.info("relational graph:");
+      printRelGraph(relRoot.project());
+      return relRoot;
+    } catch (Exception e) {
+      LOG.error("Query planner failed with exception.", e);
+      throw new SamzaException(e);
+    }
+  }
+
+  private Table createTableFromRelSchema(RelDataType relationalSchema) {
+    return new AbstractTable() {
+      public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+        List<RelDataTypeField> fieldsList = new ArrayList<>();
+        fieldsList.add(new RelDataTypeFieldImpl(SamzaSqlRelMessage.KEY_NAME, 0,
+            typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true)));
+        fieldsList.addAll(relationalSchema.getFieldList());
+        return new RelRecordType(fieldsList);
+      }
+    };
+  }
+
+  private static void printRelGraph(RelNode node) {
+    RelJsonWriter jsonWriter = new RelJsonWriter();
+    node.explain(jsonWriter);
+    LOG.info(jsonWriter.asString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
new file mode 100644
index 0000000..b078f5b
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
@@ -0,0 +1,101 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlAsOperator;
+import org.apache.calcite.sql.SqlBinaryOperator;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlPostfixOperator;
+import org.apache.calcite.sql.SqlPrefixOperator;
+import org.apache.calcite.sql.fun.SqlDatePartFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
+
+
+/**
+ * List of all operators supported in Samza Sql. This is the subset of all the calcite operators.
+ */
+public class SamzaSqlOperatorTable extends ReflectiveSqlOperatorTable {
+
+  public static final SqlBinaryOperator AND = SqlStdOperatorTable.AND;
+  public static final SqlAsOperator AS = SqlStdOperatorTable.AS;
+  public static final SqlBinaryOperator CONCAT = SqlStdOperatorTable.CONCAT;
+  public static final SqlBinaryOperator DIVIDE = SqlStdOperatorTable.DIVIDE;
+  public static final SqlBinaryOperator DOT = SqlStdOperatorTable.DOT;
+  public static final SqlBinaryOperator EQUALS = SqlStdOperatorTable.EQUALS;
+  public static final SqlBinaryOperator GREATER_THAN = SqlStdOperatorTable.GREATER_THAN;
+  public static final SqlBinaryOperator GREATER_THAN_OR_EQUAL = SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
+  public static final SqlBinaryOperator IN = SqlStdOperatorTable.IN;
+  public static final SqlBinaryOperator NOT_IN = SqlStdOperatorTable.NOT_IN;
+  public static final SqlBinaryOperator LESS_THAN = SqlStdOperatorTable.LESS_THAN;
+  public static final SqlBinaryOperator LESS_THAN_OR_EQUAL = SqlStdOperatorTable.LESS_THAN_OR_EQUAL;
+  public static final SqlBinaryOperator MINUS = SqlStdOperatorTable.MINUS;
+  public static final SqlBinaryOperator MULTIPLY = SqlStdOperatorTable.MULTIPLY;
+  public static final SqlBinaryOperator NOT_EQUALS = SqlStdOperatorTable.NOT_EQUALS;
+  public static final SqlBinaryOperator OR = SqlStdOperatorTable.OR;
+  public static final SqlBinaryOperator PLUS = SqlStdOperatorTable.PLUS;
+
+  public static final SqlPostfixOperator IS_NOT_NULL = SqlStdOperatorTable.IS_NOT_NULL;
+  public static final SqlPostfixOperator IS_NULL = SqlStdOperatorTable.IS_NULL;
+  public static final SqlPostfixOperator IS_NOT_TRUE = SqlStdOperatorTable.IS_NOT_TRUE;
+  public static final SqlPostfixOperator IS_TRUE = SqlStdOperatorTable.IS_TRUE;
+  public static final SqlPostfixOperator IS_NOT_FALSE = SqlStdOperatorTable.IS_NOT_FALSE;
+  public static final SqlPostfixOperator IS_FALSE = SqlStdOperatorTable.IS_FALSE;
+
+  public static final SqlPrefixOperator NOT = SqlStdOperatorTable.NOT;
+  public static final SqlPrefixOperator UNARY_MINUS = SqlStdOperatorTable.UNARY_MINUS;
+  public static final SqlPrefixOperator UNARY_PLUS = SqlStdOperatorTable.UNARY_PLUS;
+  public static final SqlPrefixOperator EXPLICIT_TABLE = SqlStdOperatorTable.EXPLICIT_TABLE;
+
+
+  public static final SqlFunction CHAR_LENGTH = SqlStdOperatorTable.CHAR_LENGTH;
+  public static final SqlFunction SUBSTRING = SqlStdOperatorTable.SUBSTRING;
+  public static final SqlFunction REPLACE = SqlStdOperatorTable.REPLACE;
+  public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM;
+  public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER;
+  public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER;
+  public static final SqlFunction POWER = SqlStdOperatorTable.POWER;
+  public static final SqlFunction SQRT = SqlStdOperatorTable.SQRT;
+  public static final SqlFunction MOD = SqlStdOperatorTable.MOD;
+  public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
+  public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
+  public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
+  public static final SqlFunction LOCALTIMESTAMP = SqlStdOperatorTable.LOCALTIMESTAMP;
+  public static final SqlFunction CURRENT_TIME = SqlStdOperatorTable.CURRENT_TIME;
+  public static final SqlFunction CURRENT_TIMESTAMP = SqlStdOperatorTable.CURRENT_TIMESTAMP;
+  public static final SqlFunction CURRENT_DATE = SqlStdOperatorTable.CURRENT_DATE;
+  public static final SqlFunction TIMESTAMP_ADD = SqlStdOperatorTable.TIMESTAMP_ADD;
+  public static final SqlFunction TIMESTAMP_DIFF = SqlStdOperatorTable.TIMESTAMP_DIFF;
+  public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
+  public static final SqlDatePartFunction MONTH = SqlStdOperatorTable.MONTH;
+
+  public static final SqlAggFunction COUNT = SqlStdOperatorTable.COUNT;
+  public static final SqlAggFunction SUM = SqlStdOperatorTable.SUM;
+  public static final SqlAggFunction SUM0 = SqlStdOperatorTable.SUM0;
+
+  public static final SqlFunction TUMBLE = SqlStdOperatorTable.TUMBLE;
+  public static final SqlFunction TUMBLE_END = SqlStdOperatorTable.TUMBLE_END;
+  public static final SqlFunction TUMBLE_START = SqlStdOperatorTable.TUMBLE_START;
+
+  public SamzaSqlOperatorTable() {
+    init();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
new file mode 100644
index 0000000..1200d57
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
@@ -0,0 +1,84 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+public class SamzaSqlScalarFunctionImpl implements ScalarFunction, ImplementableFunction {
+
+  private final ScalarFunction myIncFunction;
+  private final Method udfMethod;
+  private final Method getUdfMethod;
+
+
+  private final String udfName;
+
+  public SamzaSqlScalarFunctionImpl(String udfName, Method udfMethod) {
+    myIncFunction = ScalarFunctionImpl.create(udfMethod);
+    this.udfName = udfName;
+    this.udfMethod = udfMethod;
+    this.getUdfMethod = Arrays.stream(SamzaSqlExecutionContext.class.getMethods())
+        .filter(x -> x.getName().equals("getOrCreateUdf"))
+        .findFirst()
+        .get();
+  }
+
+  public String getUdfName() {
+    return udfName;
+  }
+
+  @Override
+  public CallImplementor getImplementor() {
+    return RexImpTable.createImplementor((translator, call, translatedOperands) -> {
+      final Expression context = Expressions.parameter(SamzaSqlExecutionContext.class, "context");
+      final Expression getUdfInstance = Expressions.call(ScalarUdf.class, context, getUdfMethod,
+          Expressions.constant(udfMethod.getDeclaringClass().getName()), Expressions.constant(udfName));
+      return Expressions.call(Expressions.convert_(getUdfInstance, udfMethod.getDeclaringClass()), udfMethod,
+          translatedOperands);
+    }, NullPolicy.NONE, false);
+  }
+
+  @Override
+  public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
+    return myIncFunction.getReturnType(typeFactory);
+  }
+
+  @Override
+  public List<FunctionParameter> getParameters() {
+    return myIncFunction.getParameters();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
new file mode 100644
index 0000000..476e9b0
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.util.ListSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+
+public class SamzaSqlUdfOperatorTable implements SqlOperatorTable {
+
+  private final ListSqlOperatorTable operatorTable;
+
+  public SamzaSqlUdfOperatorTable(List<SamzaSqlScalarFunctionImpl> scalarFunctions) {
+    operatorTable = new ListSqlOperatorTable(getSqlOperators(scalarFunctions));
+  }
+
+  private List<SqlOperator> getSqlOperators(List<SamzaSqlScalarFunctionImpl> scalarFunctions) {
+    return scalarFunctions.stream().map(this::getSqlOperator).collect(Collectors.toList());
+  }
+
+  private SqlOperator getSqlOperator(SamzaSqlScalarFunctionImpl scalarFunction) {
+    return new SqlUserDefinedFunction(new SqlIdentifier(scalarFunction.getUdfName(), SqlParserPos.ZERO),
+        o -> scalarFunction.getReturnType(o.getTypeFactory()), null, Checker.ANY_CHECKER, null, scalarFunction);
+  }
+
+  @Override
+  public void lookupOperatorOverloads(SqlIdentifier opName, SqlFunctionCategory category, SqlSyntax syntax,
+      List<SqlOperator> operatorList) {
+    operatorTable.lookupOperatorOverloads(opName, category, syntax, operatorList);
+  }
+
+  @Override
+  public List<SqlOperator> getOperatorList() {
+    return operatorTable.getOperatorList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
new file mode 100644
index 0000000..181971a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.runner;
+
+import java.util.List;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.sql.translator.QueryTranslator;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Entry point for the SamzaSQl stream application that takes in SQL as input and performs stream processing.
+ */
+public class SamzaSqlApplication implements StreamApplication {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlApplication.class);
+
+  @Override
+  public void init(StreamGraph streamGraph, Config config) {
+    try {
+      SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config);
+      QueryTranslator queryTranslator = new QueryTranslator(sqlConfig);
+      List<SamzaSqlQueryParser.QueryInfo> queries = sqlConfig.getQueryInfo();
+      for (SamzaSqlQueryParser.QueryInfo query : queries) {
+        LOG.info("Translating the query {} to samza stream graph", query.getSelectQuery());
+        queryTranslator.translate(query, streamGraph);
+      }
+    } catch (RuntimeException e) {
+      LOG.error("SamzaSqlApplication threw exception.", e);
+      throw e;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
new file mode 100644
index 0000000..f029745
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -0,0 +1,245 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.runner;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.sql.interfaces.SourceResolverFactory;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.UdfMetadata;
+import org.apache.samza.sql.interfaces.UdfResolver;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
+import org.apache.samza.sql.testutil.SqlFileParser;
+import org.apache.samza.system.SystemStream;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class representing the Samza SQL application config
+ */
+public class SamzaSqlApplicationConfig {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlApplicationConfig.class);
+  public static final String CFG_SQL_STMT = "samza.sql.stmt";
+  public static final String CFG_SQL_STMTS_JSON = "samza.sql.stmts.json";
+  public static final String CFG_SQL_FILE = "samza.sql.sqlFile";
+
+  public static final String CFG_UDF_CONFIG_DOMAIN = "samza.sql.udf";
+
+  public static final String CFG_FACTORY = "factory";
+
+  public static final String CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN = "samza.sql.relSchemaProvider.%s.";
+  public static final String CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN = "samza.sql.relConverter.%s.";
+
+  public static final String CFG_SOURCE_RESOLVER = "samza.sql.sourceResolver";
+  public static final String CFG_FMT_SOURCE_RESOLVER_DOMAIN = "samza.sql.sourceResolver.%s.";
+
+  public static final String CFG_UDF_RESOLVER = "samza.sql.udfResolver";
+  public static final String CFG_FMT_UDF_RESOLVER_DOMAIN = "samza.sql.udfResolver.%s.";
+  private final Map<SystemStream, RelSchemaProvider> relSchemaProvidersBySystemStream;
+  private final Map<SystemStream, SamzaRelConverter> samzaRelConvertersBySystemStream;
+
+  private SourceResolver sourceResolver;
+  private UdfResolver udfResolver;
+
+  private final Collection<UdfMetadata> udfMetadata;
+
+  private final Map<String, SqlSystemStreamConfig> inputSystemStreamConfigBySource;
+
+  private final Map<String, SqlSystemStreamConfig> outputSystemStreamConfigsBySource;
+
+  private final List<String> sql;
+
+  private final List<QueryInfo> queryInfo;
+
+  public SamzaSqlApplicationConfig(Config staticConfig) {
+
+    sql = fetchSqlFromConfig(staticConfig);
+
+    queryInfo = fetchQueryInfo(sql);
+
+    sourceResolver = createSourceResolver(staticConfig);
+
+    udfResolver = createUdfResolver(staticConfig);
+    udfMetadata = udfResolver.getUdfs();
+
+    inputSystemStreamConfigBySource = queryInfo.stream()
+        .map(QueryInfo::getInputSources)
+        .flatMap(Collection::stream)
+        .collect(Collectors.toMap(Function.identity(), sourceResolver::fetchSourceInfo));
+
+    Set<SqlSystemStreamConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
+
+    outputSystemStreamConfigsBySource = queryInfo.stream()
+        .map(QueryInfo::getOutputSource)
+        .collect(Collectors.toMap(Function.identity(), x -> sourceResolver.fetchSourceInfo(x)));
+    systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
+
+    relSchemaProvidersBySystemStream = systemStreamConfigs.stream()
+        .collect(Collectors.toMap(SqlSystemStreamConfig::getSystemStream,
+            x -> initializePlugin("RelSchemaProvider", x.getRelSchemaProviderName(), staticConfig,
+                CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
+                (o, c) -> ((RelSchemaProviderFactory) o).create(x.getSystemStream(), c))));
+
+    samzaRelConvertersBySystemStream = systemStreamConfigs.stream()
+        .collect(Collectors.toMap(SqlSystemStreamConfig::getSystemStream,
+            x -> initializePlugin("SamzaRelConverter", x.getSamzaRelConverterName(), staticConfig,
+                CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
+                    relSchemaProvidersBySystemStream.get(x.getSystemStream()), c))));
+  }
+
+  private static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
+      String pluginDomainFormat, BiFunction<Object, Config, T> factoryInvoker) {
+    String pluginDomain = String.format(pluginDomainFormat, plugin);
+    Config pluginConfig = staticConfig.subset(pluginDomain);
+    String factoryName = pluginConfig.getOrDefault(CFG_FACTORY, "");
+    Validate.notEmpty(factoryName, String.format("Factory is not set for %s", plugin));
+    Object factory = ReflectionUtils.createInstance(factoryName);
+    Validate.notNull(factory, String.format("Factory creation failed for %s", plugin));
+    LOG.info("Instantiating {} using factory {} with props {}", pluginName, factoryName, pluginConfig);
+    return factoryInvoker.apply(factory, pluginConfig);
+  }
+
+  public static List<QueryInfo> fetchQueryInfo(List<String> sqlStmts) {
+    return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
+  }
+
+  public static List<String> fetchSqlFromConfig(Map<String, String> config) {
+    List<String> sql;
+    if (config.containsKey(CFG_SQL_STMT) && StringUtils.isNotBlank(config.get(CFG_SQL_STMT))) {
+      String sqlValue = config.get(CFG_SQL_STMT);
+      sql = Collections.singletonList(sqlValue);
+    } else if (config.containsKey(CFG_SQL_STMTS_JSON) && StringUtils.isNotBlank(config.get(CFG_SQL_STMTS_JSON))) {
+      sql = deserializeSqlStmts(config.get(CFG_SQL_STMTS_JSON));
+    } else if (config.containsKey(CFG_SQL_FILE)) {
+      String sqlFile = config.get(CFG_SQL_FILE);
+      sql = SqlFileParser.parseSqlFile(sqlFile);
+    } else {
+      String msg = "Config doesn't contain the SQL that needs to be executed.";
+      LOG.error(msg);
+      throw new SamzaException(msg);
+    }
+
+    return sql;
+  }
+
+  private static List<String> deserializeSqlStmts(String value) {
+    Validate.notEmpty(value, "json Value is not set or empty");
+    return JsonUtil.fromJson(value, new TypeReference<List<String>>() {
+    });
+  }
+
+  public static String serializeSqlStmts(List<String> sqlStmts) {
+    Validate.notEmpty(sqlStmts, "json Value is not set or empty");
+    return JsonUtil.toJson(sqlStmts);
+  }
+
+  public static SourceResolver createSourceResolver(Config config) {
+    String sourceResolveValue = config.get(CFG_SOURCE_RESOLVER);
+    Validate.notEmpty(sourceResolveValue, "sourceResolver config is not set or empty");
+    return initializePlugin("SourceResolver", sourceResolveValue, config, CFG_FMT_SOURCE_RESOLVER_DOMAIN,
+        (o, c) -> ((SourceResolverFactory) o).create(c));
+  }
+
+  private UdfResolver createUdfResolver(Map<String, String> config) {
+    String udfResolveValue = config.get(CFG_UDF_RESOLVER);
+    Validate.notEmpty(udfResolveValue, "udfResolver config is not set or empty");
+    HashMap<String, String> domainConfig =
+        getDomainProperties(config, String.format(CFG_FMT_UDF_RESOLVER_DOMAIN, udfResolveValue), false);
+    Properties props = new Properties();
+    props.putAll(domainConfig);
+    HashMap<String, String> udfConfig = getDomainProperties(config, CFG_UDF_CONFIG_DOMAIN, false);
+    return new ConfigBasedUdfResolver(props, new MapConfig(udfConfig));
+  }
+
+  private static HashMap<String, String> getDomainProperties(Map<String, String> props, String prefix,
+      boolean preserveFullKey) {
+    String fullPrefix;
+    if (StringUtils.isBlank(prefix)) {
+      fullPrefix = ""; // this will effectively retrieve all properties
+    } else {
+      fullPrefix = prefix.endsWith(".") ? prefix : prefix + ".";
+    }
+    HashMap<String, String> ret = new HashMap<>();
+    props.keySet().forEach(keyStr -> {
+      if (keyStr.startsWith(fullPrefix) && !keyStr.equals(fullPrefix)) {
+        if (preserveFullKey) {
+          ret.put(keyStr, props.get(keyStr));
+        } else {
+          ret.put(keyStr.substring(fullPrefix.length()), props.get(keyStr));
+        }
+      }
+    });
+    return ret;
+  }
+
+  public List<String> getSql() {
+    return sql;
+  }
+
+  public List<QueryInfo> getQueryInfo() {
+    return queryInfo;
+  }
+
+  public Collection<UdfMetadata> getUdfMetadata() {
+    return udfMetadata;
+  }
+
+  public Map<String, SqlSystemStreamConfig> getInputSystemStreamConfigBySource() {
+    return inputSystemStreamConfigBySource;
+  }
+
+  public Map<String, SqlSystemStreamConfig> getOutputSystemStreamConfigsBySource() {
+    return outputSystemStreamConfigsBySource;
+  }
+
+  public Map<SystemStream, SamzaRelConverter> getSamzaRelConverters() {
+    return samzaRelConvertersBySystemStream;
+  }
+
+  public Map<SystemStream, RelSchemaProvider> getRelSchemaProviders() {
+    return relSchemaProvidersBySystemStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
new file mode 100644
index 0000000..57d889f
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -0,0 +1,133 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.runner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.Validate;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.runtime.AbstractApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.RemoteApplicationRunner;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Application runner implementation for SamzaSqlApplication.
+ * SamzaSqlApplication needs SamzaSqlConfigRewriter to infer some of the configs from SQL statements.
+ * Since Samza's config rewriting capability is available only in the RemoteApplicationRunner.
+ * This runner invokes the SamzaSqlConfig re-writer if it is invoked on a standalone mode (i.e. localRunner == true)
+ * otherwise directly calls the RemoteApplicationRunner which automatically performs the config rewriting .
+ */
+public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlApplicationRunner.class);
+
+  private final Config sqlConfig;
+  private final ApplicationRunner appRunner;
+  private final Boolean localRunner;
+
+  public static final String RUNNER_CONFIG = "app.runner.class";
+  public static final String CFG_FMT_SAMZA_STREAM_SYSTEM = "streams.%s.samza.system";
+
+  public SamzaSqlApplicationRunner(Config config) {
+    this(false, config);
+  }
+
+  public SamzaSqlApplicationRunner(Boolean localRunner, Config config) {
+    super(config);
+    this.localRunner = localRunner;
+    sqlConfig = computeSamzaConfigs(localRunner, config);
+    appRunner = ApplicationRunner.fromConfig(sqlConfig);
+  }
+
+  public static Config computeSamzaConfigs(Boolean localRunner, Config config) {
+    Map<String, String> newConfig = new HashMap<>();
+
+    SourceResolver sourceResolver = SamzaSqlApplicationConfig.createSourceResolver(config);
+    // Parse the sql and find the input stream streams
+    List<String> sqlStmts = SamzaSqlApplicationConfig.fetchSqlFromConfig(config);
+
+    // This is needed because the SQL file may not be available in all the node managers.
+    String sqlJson = SamzaSqlApplicationConfig.serializeSqlStmts(sqlStmts);
+    newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, sqlJson);
+
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = SamzaSqlApplicationConfig.fetchQueryInfo(sqlStmts);
+    for (SamzaSqlQueryParser.QueryInfo query : queryInfo) {
+      // Populate stream to system mapping config for input and output system streams
+      for (String inputSource : query.getInputSources()) {
+        SqlSystemStreamConfig inputSystemStreamConfig = sourceResolver.fetchSourceInfo(inputSource);
+        newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
+            inputSystemStreamConfig.getSystemName());
+      }
+      SqlSystemStreamConfig outputSystemStreamConfig = sourceResolver.fetchSourceInfo(query.getOutputSource());
+      newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()),
+          outputSystemStreamConfig.getSystemName());
+    }
+
+    if (localRunner) {
+      newConfig.put(RUNNER_CONFIG, LocalApplicationRunner.class.getName());
+    } else {
+      newConfig.put(RUNNER_CONFIG, RemoteApplicationRunner.class.getName());
+    }
+
+    newConfig.putAll(config);
+
+    LOG.info("New Samza configs: " + newConfig);
+    return new MapConfig(newConfig);
+  }
+
+  public void runAndWaitForFinish() {
+    Validate.isTrue(localRunner, "This method can be called only in standalone mode.");
+    SamzaSqlApplication app = new SamzaSqlApplication();
+    run(app);
+    ((LocalApplicationRunner) appRunner).waitForFinish();
+  }
+
+  @Override
+  public void runTask() {
+    appRunner.runTask();
+  }
+
+  @Override
+  public void run(StreamApplication streamApp) {
+    Validate.isInstanceOf(SamzaSqlApplication.class, streamApp);
+    appRunner.run(streamApp);
+  }
+
+  @Override
+  public void kill(StreamApplication streamApp) {
+    appRunner.kill(streamApp);
+  }
+
+  @Override
+  public ApplicationStatus status(StreamApplication streamApp) {
+    return appRunner.status(streamApp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java
new file mode 100644
index 0000000..8bbab86
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+
+
+/**
+ * Utility methods to aid with config management.
+ */
+public class ConfigUtil {
+
+  private ConfigUtil() {
+  }
+
+  /**
+   * Method is used to filter just the properties with the prefix.
+   * @param props Full set of properties
+   * @param prefix Prefix of the keys that in the config that needs to be filtered out.
+   * @param preserveFullKey If set to true, after filtering, preserves the full key including the prefix.
+   *                        If set to false, Strips out the prefix from the key before returning.
+   * @return Returns the filtered set of properties matching the prefix from the input property bag.
+   */
+  public static Properties getDomainProperties(Properties props, String prefix, boolean preserveFullKey) {
+    String fullPrefix;
+    if (StringUtils.isBlank(prefix)) {
+      fullPrefix = ""; // this will effectively retrieve all properties
+    } else {
+      fullPrefix = prefix.endsWith(".") ? prefix : prefix + ".";
+    }
+    Properties ret = new Properties();
+    props.keySet().stream().map(String.class::cast).forEach(keyStr -> {
+      if (keyStr.startsWith(fullPrefix) && !keyStr.equals(fullPrefix)) {
+        if (preserveFullKey) {
+          ret.put(keyStr, props.get(keyStr));
+        } else {
+          ret.put(keyStr.substring(fullPrefix.length()), props.get(keyStr));
+        }
+      }
+    });
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java
new file mode 100644
index 0000000..ea78f41
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java
@@ -0,0 +1,91 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility methods to aid serialization and deserialization of Json.
+ */
+public class JsonUtil {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  private static final Logger LOG = LoggerFactory.getLogger(JsonUtil.class.getName());
+
+  static {
+    MAPPER.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
+
+  private JsonUtil() {
+  }
+
+  /**
+   * Deserialize a JSON string into an object based on a type reference.
+   * This method allows the caller to specify precisely the desired output
+   * type for the target object.
+   * @param json JSON string
+   * @param typeRef type reference of the target object
+   * @param <T> type of the target object
+   * @return deserialized Java object
+   */
+  public static <T> T fromJson(String json, TypeReference<T> typeRef) {
+    Validate.notNull(json, "null JSON string");
+    Validate.notNull(typeRef, "null type reference");
+    T object;
+    try {
+      object = MAPPER.readValue(json, typeRef);
+    } catch (IOException e) {
+      String errorMessage = "Failed to parse json: " + json;
+      LOG.error(errorMessage, e);
+      throw new SamzaException(errorMessage, e);
+    }
+    return object;
+  }
+
+  /**
+   * Serialize a Java object into JSON string.
+   * @param object object to be serialized
+   * @param <T> type of the input object
+   * @return JSON string
+   */
+  public static <T> String toJson(T object) {
+    Validate.notNull(object, "null input object");
+    StringWriter out = new StringWriter();
+    try {
+      MAPPER.writeValue(out, object);
+    } catch (IOException e) {
+      String errorMessage = "Failed to serialize object: " + object;
+      LOG.error(errorMessage, e);
+      throw new SamzaException(errorMessage, e);
+    }
+    return out.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java
new file mode 100644
index 0000000..4293c76
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.lang.reflect.Constructor;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to simplify usage of Java reflection.
+ */
+public class ReflectionUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtils.class);
+
+  private ReflectionUtils() {
+
+  }
+
+  /**
+   * Create an instance of the specified class with constuctor
+   * matching the argument array.
+   * @param clazz name of the class
+   * @param args argument array
+   * @param <T> type fo the class
+   * @return instance of the class, or null if anything went wrong
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T createInstance(String clazz, Object... args) {
+    Validate.notNull(clazz, "null class name");
+    try {
+      Class<T> classObj = (Class<T>) Class.forName(clazz);
+      Class<?>[] argTypes = new Class<?>[args.length];
+      IntStream.range(0, args.length).forEach(i -> argTypes[i] = args[i].getClass());
+      Constructor<T> ctor = classObj.getDeclaredConstructor(argTypes);
+      return ctor.newInstance(args);
+    } catch (Exception e) {
+      LOG.warn("Failed to create instance for: " + clazz, e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
new file mode 100644
index 0000000..dd5f3bc
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
@@ -0,0 +1,188 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlAsOperator;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlUnnestOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.samza.SamzaException;
+
+
+/**
+ * Utility class that is used to parse the Samza sql query to figure out the inputs, outputs etc..
+ */
+public class SamzaSqlQueryParser {
+
+  private SamzaSqlQueryParser() {
+  }
+
+  public static class QueryInfo {
+    private final List<String> inputSources;
+    private String selectQuery;
+    private String outputSource;
+
+    public QueryInfo(String selectQuery, List<String> inputSources, String outputSource) {
+      this.selectQuery = selectQuery;
+      this.outputSource = outputSource;
+      this.inputSources = inputSources;
+    }
+
+    public List<String> getInputSources() {
+      return inputSources;
+    }
+
+    public String getSelectQuery() {
+      return selectQuery;
+    }
+
+    public String getOutputSource() {
+      return outputSource;
+    }
+  }
+
+  public static QueryInfo parseQuery(String sql) {
+
+    Pattern insertIntoSqlPattern = Pattern.compile("insert into (.*) (select .* from (.*))", Pattern.CASE_INSENSITIVE);
+    Matcher m = insertIntoSqlPattern.matcher(sql);
+    if (!m.matches()) {
+      throw new SamzaException("Invalid query format");
+    }
+
+    Planner planner = createPlanner();
+    SqlNode sqlNode;
+    try {
+      sqlNode = planner.parse(sql);
+    } catch (SqlParseException e) {
+      throw new SamzaException(e);
+    }
+
+    String outputSource;
+    String selectQuery;
+    String inputSource;
+    if (sqlNode instanceof SqlInsert) {
+      SqlInsert sqlInsert = ((SqlInsert) sqlNode);
+      outputSource = sqlInsert.getTargetTable().toString();
+      if (sqlInsert.getSource() instanceof SqlSelect) {
+        SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
+        selectQuery = m.group(2);
+        inputSource = getInputFromSelectQuery(sqlSelect);
+      } else {
+        throw new SamzaException("Sql query is not of the expected format");
+      }
+    } else {
+      throw new SamzaException("Sql query is not of the expected format");
+    }
+
+    return new QueryInfo(selectQuery, Collections.singletonList(inputSource), outputSource);
+  }
+
+  private static Planner createPlanner() {
+    Connection connection;
+    SchemaPlus rootSchema;
+    try {
+      connection = DriverManager.getConnection("jdbc:calcite:");
+      CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+      rootSchema = calciteConnection.getRootSchema();
+    } catch (SQLException e) {
+      throw new SamzaException(e);
+    }
+
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
+
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+    FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
+        .parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).build())
+        .defaultSchema(rootSchema)
+        .operatorTable(SqlStdOperatorTable.instance())
+        .traitDefs(traitDefs)
+        .context(Contexts.EMPTY_CONTEXT)
+        .costFactory(null)
+        .build();
+    return Frameworks.getPlanner(frameworkConfig);
+  }
+
+  private static String getInputFromSelectQuery(SqlSelect sqlSelect) {
+    ArrayList<String> input = new ArrayList<>();
+    getInput(sqlSelect.getFrom(), input);
+    if (input.size() != 1) {
+      throw new SamzaException("Unsupported query " + sqlSelect);
+    }
+
+    return input.get(0);
+  }
+
+  private static void getInput(SqlNode node, ArrayList<String> inputSource) {
+    if (node instanceof SqlJoin) {
+      SqlJoin joinNode = (SqlJoin) node;
+      ArrayList<String> inputsLeft = new ArrayList<>();
+      ArrayList<String> inputsRight = new ArrayList<>();
+      getInput(joinNode.getLeft(), inputsLeft);
+      getInput(joinNode.getRight(), inputsRight);
+
+      if (!inputsLeft.isEmpty() && !inputsRight.isEmpty()) {
+        throw new SamzaException("Joins on two entities are not supported yet");
+      }
+
+      inputSource.addAll(inputsLeft);
+      inputSource.addAll(inputsRight);
+    } else if (node instanceof SqlIdentifier) {
+      inputSource.add(node.toString());
+    } else if (node instanceof SqlBasicCall) {
+      SqlBasicCall basicCall = ((SqlBasicCall) node);
+      if (basicCall.getOperator() instanceof SqlAsOperator) {
+        getInput(basicCall.operand(0), inputSource);
+      } else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) {
+        inputSource.add(getInputFromSelectQuery(basicCall.operand(0)));
+        return;
+      }
+    } else if (node instanceof SqlSelect) {
+      getInput(((SqlSelect) node).getFrom(), inputSource);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java
new file mode 100644
index 0000000..ca355ad
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java
@@ -0,0 +1,103 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility to read the .sql file and parse out the various sql statements in the file.
+ * Right now Samza SQL
+ *     Samza SQL supports a sql file with multiple SQL statements where each SQL statement can be spread across
+ *       multiple lines.
+ *     It supports sql comments where a line starts with "--".
+ *     It cannot support multiple sql statements in a single line.
+ *     All the empty lines are ignored
+ *     All the SQL statements should start with "insert into".
+ *
+ * e.g. SQL File
+ * -- Sample comment
+ * insert into log.output1 select * from kafka.input1
+ *
+ * insert into log.output2
+ *   select * from kafka.input2
+ *
+ * -- You may have empty lines in between a single query.
+ * insert into log.output3
+ *
+ *   select * from kafka.input3
+ *
+ * -- Below line which contains multiple sql statements are not supported
+ * -- insert into log.output4 select * from kafka.input4 insert into log.output5 select * from kafka.input5
+ *
+ * -- Below SQL statement is not supported because it doesn't start with insert into
+ * -- select * from kafka.input6
+ */
+public class SqlFileParser {
+
+  private static final String INSERT_CMD = "insert";
+  private static final Logger LOG = LoggerFactory.getLogger(SqlFileParser.class);
+  private static final String SQL_COMMENT_PREFIX = "--";
+
+  private SqlFileParser() {
+  }
+
+  public static List<String> parseSqlFile(String fileName) {
+    Validate.notEmpty(fileName, "fileName cannot be empty.");
+    List<String> sqlLines;
+    try {
+      sqlLines = Files.lines(Paths.get(fileName)).collect(Collectors.toList());
+    } catch (IOException e) {
+      String msg = String.format("Unable to parse the sql file %s", fileName);
+      LOG.error(msg, e);
+      throw new SamzaException(msg, e);
+    }
+    List<String> sqlStmts = new ArrayList<>();
+    String lastStatement = "";
+    for (String sqlLine : sqlLines) {
+      String sql = sqlLine.trim();
+      if (sql.toLowerCase().startsWith(INSERT_CMD)) {
+        if (StringUtils.isNotEmpty(lastStatement)) {
+          sqlStmts.add(lastStatement);
+        }
+
+        lastStatement = sql;
+      } else if (StringUtils.isNotBlank(sql) && !sql.startsWith(SQL_COMMENT_PREFIX)) {
+        lastStatement = String.format("%s %s", lastStatement, sql);
+      }
+    }
+
+    if (!StringUtils.isWhitespace(lastStatement)) {
+      sqlStmts.add(lastStatement);
+    }
+    return sqlStmts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
new file mode 100644
index 0000000..e4bfcae
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.sql.data.Expression;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Translator to translate the LogicalFilter node in the relational graph to the corresponding StreamGraph
+ * implementation
+ */
+public class FilterTranslator {
+
+  private static final Logger log = LoggerFactory.getLogger(FilterTranslator.class);
+
+  public void translate(final LogicalFilter filter, final TranslatorContext context) {
+    MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(filter.getInput().getId());
+    Expression expr =
+        context.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
+
+    MessageStream<SamzaSqlRelMessage> outputStream = inputStream.filter(message -> {
+      Object[] result = new Object[1];
+      expr.execute(context.getExecutionContext(), context.getDataContext(), message.getRelFieldValues().toArray(), result);
+      if (result.length > 0 && result[0] instanceof Boolean) {
+        boolean retVal = (Boolean) result[0];
+        log.debug(
+            String.format("return value for input %s is %s", Arrays.asList(message.getFieldValues()).toString(), retVal));
+        return retVal;
+      } else {
+        log.error("return value is not boolean");
+        return false;
+      }
+    });
+
+    context.registerMessageStream(filter.getId(), outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
new file mode 100644
index 0000000..c0387ad
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -0,0 +1,108 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.sql.data.Expression;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Translator to translate the Project node in the relational graph to the corresponding StreamGraph
+ * implementation.
+ */
+public class ProjectTranslator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ProjectTranslator.class);
+
+  public void translate(final Project project, final TranslatorContext context) {
+    MessageStream<SamzaSqlRelMessage> messageStream = context.getMessageStream(project.getInput().getId());
+    List<Integer> flattenProjects =
+        project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList());
+
+    if (flattenProjects.size() > 0) {
+      if (flattenProjects.size() > 1) {
+        String msg = "Multiple flatten operators in a single query is not supported";
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+
+      messageStream = translateFlatten(flattenProjects.get(0), messageStream);
+    }
+
+    Expression expr = context.getExpressionCompiler().compile(project.getInputs(), project.getProjects());
+
+    MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(m -> {
+      RelDataType type = project.getRowType();
+      Object[] output = new Object[type.getFieldCount()];
+      expr.execute(context.getExecutionContext(), context.getDataContext(), m.getRelFieldValues().toArray(), output);
+      List<String> names = new ArrayList<>();
+      for (int index = 0; index < output.length; index++) {
+        names.add(index, project.getNamedProjects().get(index).getValue());
+      }
+
+      return SamzaSqlRelMessage.createRelMessage(Arrays.asList(output), names);
+    });
+
+    context.registerMessageStream(project.getId(), outputStream);
+  }
+
+  private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer flattenIndex,
+      MessageStream<SamzaSqlRelMessage> inputStream) {
+    return inputStream.flatMap(message -> {
+      Object field = message.getRelFieldValues().get(flattenIndex);
+
+      if (field != null && field instanceof List) {
+        List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
+        for (Object fieldValue : (List) field) {
+          List<Object> newValues = new ArrayList<>(message.getFieldValues());
+          newValues.set(flattenIndex, Collections.singletonList(fieldValue));
+          outMessages.add(new SamzaSqlRelMessage(message.getKey(), message.getFieldNames(), newValues));
+        }
+        return outMessages;
+      } else {
+        return Collections.singletonList(message);
+      }
+    });
+  }
+
+  private boolean isFlatten(RexNode rexNode) {
+    return rexNode instanceof RexCall && ((RexCall) rexNode).op instanceof SqlUserDefinedFunction
+        && ((RexCall) rexNode).op.getName().equalsIgnoreCase("flatten");
+  }
+
+  private Integer getProjectIndex(RexNode rexNode) {
+    return ((RexInputRef) ((RexCall) rexNode).getOperands().get(0)).getIndex();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
new file mode 100644
index 0000000..ab17018
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.planner.QueryPlanner;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+
+
+/**
+ * This class is used to populate the StreamGraph using the SQL queries.
+ * This class contains the core of the SamzaSQL control code that converts the SQL statements to calcite relational graph.
+ * It then walks the relational graph and then populates the Samza's {@link StreamGraph} accordingly.
+ */
+public class QueryTranslator {
+
+  private final ScanTranslator scanTranslator;
+  private final SamzaSqlApplicationConfig sqlConfig;
+
+  public QueryTranslator(SamzaSqlApplicationConfig sqlConfig) {
+    this.sqlConfig = sqlConfig;
+    scanTranslator = new ScanTranslator(sqlConfig.getSamzaRelConverters());
+  }
+
+  public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamGraph streamGraph) {
+    QueryPlanner planner =
+        new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
+            sqlConfig.getUdfMetadata());
+    final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig);
+    final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
+    final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext);
+    final RelNode node = relRoot.project();
+
+    node.accept(new RelShuttleImpl() {
+      @Override
+      public RelNode visit(TableScan scan) {
+        RelNode node = super.visit(scan);
+        scanTranslator.translate(scan, context);
+        return node;
+      }
+
+      @Override
+      public RelNode visit(LogicalFilter filter) {
+        RelNode node = visitChild(filter, 0, filter.getInput());
+        new FilterTranslator().translate(filter, context);
+        return node;
+      }
+
+      @Override
+      public RelNode visit(LogicalProject project) {
+        RelNode node = super.visit(project);
+        new ProjectTranslator().translate(project, context);
+        return node;
+      }
+    });
+
+    SqlSystemStreamConfig outputSystemConfig =
+        sqlConfig.getOutputSystemStreamConfigsBySource().get(queryInfo.getOutputSource());
+    SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(outputSystemConfig.getSystemStream());
+    MessageStreamImpl<SamzaSqlRelMessage> stream =
+        (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId());
+    MessageStream<KV<Object, Object>> outputStream = stream.map(samzaMsgConverter::convertToSamzaMessage);
+    outputStream.sendTo(streamGraph.getOutputStream(outputSystemConfig.getStreamName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
new file mode 100644
index 0000000..202bdbd
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -0,0 +1,70 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+
+import com.google.common.base.Joiner;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Translator to translate the TableScans in relational graph to the corresponding input streams in the StreamGraph
+ * implementation
+ */
+public class ScanTranslator {
+
+  private final Map<SystemStream, SamzaRelConverter> relMsgConverters;
+
+  public ScanTranslator(Map<SystemStream, SamzaRelConverter> converters) {
+    relMsgConverters = converters;
+  }
+
+  public void translate(final TableScan tableScan, final TranslatorContext context) {
+    StreamGraph streamGraph = context.getStreamGraph();
+    List<String> tableNameParts = tableScan.getTable().getQualifiedName();
+    Validate.isTrue(tableNameParts.size() == 2,
+        String.format("table name %s is not of the format <SystemName>.<StreamName>",
+            Joiner.on(".").join(tableNameParts)));
+
+    String streamName = tableNameParts.get(1);
+    String systemName = tableNameParts.get(0);
+    SystemStream systemStream = new SystemStream(systemName, streamName);
+
+    Validate.isTrue(relMsgConverters.containsKey(systemStream), String.format("Unknown system %s", systemName));
+    SamzaRelConverter converter = relMsgConverters.get(systemStream);
+
+    MessageStream<KV<Object, Object>> inputStream = streamGraph.getInputStream(streamName);
+    MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(converter::convertToRelMessage);
+
+    context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
new file mode 100644
index 0000000..fd5195b
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -0,0 +1,162 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.sql.data.RexToJavaCompiler;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+
+
+/**
+ * State that is maintained while translating the Calcite relational graph to Samza {@link StreamGraph}.
+ */
+public class TranslatorContext {
+  private final StreamGraph streamGraph;
+  private final Map<Integer, MessageStream> messsageStreams = new HashMap<>();
+  private final RexToJavaCompiler compiler;
+
+  private final SamzaSqlExecutionContext executionContext;
+  private final DataContextImpl dataContext;
+
+  private static class DataContextImpl implements DataContext {
+
+    @Override
+    public SchemaPlus getRootSchema() {
+      return null;
+    }
+
+    @Override
+    public JavaTypeFactory getTypeFactory() {
+      return null;
+    }
+
+    @Override
+    public QueryProvider getQueryProvider() {
+      return null;
+    }
+
+    @Override
+    public Object get(String name) {
+      if (name.equals(Variable.CURRENT_TIMESTAMP.camelName)) {
+        return System.currentTimeMillis();
+      }
+
+      return null;
+    }
+  }
+
+  private static class SamzaSqlRexBuilder extends RexBuilder {
+    private SamzaSqlRexBuilder(RelDataTypeFactory typeFactory) {
+      super(typeFactory);
+    }
+
+    /**
+     * Since Drill has different mechanism and rules for implicit casting,
+     * ensureType() is overridden to avoid conflicting cast functions being added to the expressions.
+     */
+    @Override
+    public RexNode ensureType(RelDataType type, RexNode node, boolean matchNullability) {
+      return node;
+    }
+  }
+
+  /**
+   * Create the instance of TranslatorContext
+   * @param streamGraph Samza's streamGraph that is populated during the translation.
+   * @param relRoot Root of the relational graph from calcite.
+   * @param executionContext the execution context
+   */
+  public TranslatorContext(StreamGraph streamGraph, RelRoot relRoot, SamzaSqlExecutionContext executionContext) {
+    this.streamGraph = streamGraph;
+    this.compiler = createExpressionCompiler(relRoot);
+    this.executionContext = executionContext;
+    this.dataContext = new DataContextImpl();
+  }
+
+  /**
+   * Gets stream graph.
+   *
+   * @return the stream graph
+   */
+  public StreamGraph getStreamGraph() {
+    return streamGraph;
+  }
+
+  private RexToJavaCompiler createExpressionCompiler(RelRoot relRoot) {
+    RelDataTypeFactory dataTypeFactory = relRoot.project().getCluster().getTypeFactory();
+    RexBuilder rexBuilder = new SamzaSqlRexBuilder(dataTypeFactory);
+    return new RexToJavaCompiler(rexBuilder);
+  }
+
+  /**
+   * Gets execution context.
+   *
+   * @return the execution context
+   */
+  public SamzaSqlExecutionContext getExecutionContext() {
+    return executionContext;
+  }
+
+  public DataContext getDataContext() {
+    return dataContext;
+  }
+
+  /**
+   * Gets expression compiler.
+   *
+   * @return the expression compiler
+   */
+  public RexToJavaCompiler getExpressionCompiler() {
+    return compiler;
+  }
+
+  /**
+   * Register message stream.
+   *
+   * @param id the id
+   * @param stream the stream
+   */
+  public void registerMessageStream(int id, MessageStream stream) {
+    messsageStreams.put(id, stream);
+  }
+
+  /**
+   * Gets message stream.
+   *
+   * @param id the id
+   * @return the message stream
+   */
+  public MessageStream getMessageStream(int id) {
+    return messsageStreams.get(id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
new file mode 100644
index 0000000..7fa9974
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
@@ -0,0 +1,103 @@
+package org.apache.samza.sql;
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.translator.QueryTranslator;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestQueryTranslator {
+
+  @Test
+  public void testTranslate() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic select MyTest(id) from testavro.SIMPLE1");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+    Assert.assertEquals(1, streamGraph.getOutputStreams().size());
+    Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("outputTopic", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals(1, streamGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("SIMPLE1",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+  }
+
+  @Test
+  public void testTranslateComplex() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic select Flatten(array_values) from testavro.COMPLEX1");
+//    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+//        "Insert into testavro.foo2 select string_value, SUM(id) from testavro.COMPLEX1 "
+//            + "GROUP BY TumbleWindow(CURRENT_TIME, INTERVAL '1' HOUR), string_value");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+    Assert.assertEquals(1, streamGraph.getOutputStreams().size());
+    Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("outputTopic", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals(1, streamGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("COMPLEX1",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+  }
+
+  @Test
+  public void testTranslateSubQuery() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic select Flatten(a), id from (select id, array_values a, string_value s from testavro.COMPLEX1)");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, streamGraph);
+    Assert.assertEquals(1, streamGraph.getOutputStreams().size());
+    Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("outputTopic", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals(1, streamGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("COMPLEX1",
+        streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+  }
+}


[3/3] samza git commit: Samza SQL implementation for basic projects, filtering and UDFs

Posted by xi...@apache.org.
Samza SQL implementation for basic projects, filtering and UDFs

## Samza SQL implementation for basic projects, filtering and

## Design document:
https://docs.google.com/document/d/1bE-ZuPfTpntm1hT3GwQEShYDiTqU3IkxeP4-3ZcGHgU/edit?usp=sharing

Author: Srinivasulu Punuru <sp...@linkedin.com>

Reviewers: Yi Pan <ni...@gmail.com>

Closes #295 from srinipunuru/samza-sql.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9fa8beed
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9fa8beed
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9fa8beed

Branch: refs/heads/master
Commit: 9fa8beed7ac37c33a0c01412b075a9c9a0b47441
Parents: 2e2e00e
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Thu Nov 16 11:14:32 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Nov 16 11:14:32 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  33 ++-
 gradle/dependency-versions.gradle               |   2 +
 .../org/apache/samza/sql/udfs/ScalarUdf.java    |  48 ++++
 .../apache/samza/sql/avro/AvroRelConverter.java | 183 ++++++++++++++
 .../samza/sql/avro/AvroRelConverterFactory.java |  44 ++++
 .../samza/sql/avro/AvroRelSchemaProvider.java   |  28 +++
 .../samza/sql/avro/AvroTypeFactoryImpl.java     | 132 ++++++++++
 ...ConfigBasedAvroRelSchemaProviderFactory.java |  63 +++++
 .../org/apache/samza/sql/data/Expression.java   |  38 +++
 .../samza/sql/data/RexToJavaCompiler.java       | 224 +++++++++++++++++
 .../sql/data/SamzaSqlExecutionContext.java      |  61 +++++
 .../samza/sql/data/SamzaSqlRelMessage.java      | 123 ++++++++++
 .../org/apache/samza/sql/fn/FlattenUdf.java     |  36 +++
 .../impl/ConfigBasedSourceResolverFactory.java  |  71 ++++++
 .../samza/sql/impl/ConfigBasedUdfResolver.java  |  97 ++++++++
 .../samza/sql/interfaces/RelSchemaProvider.java |  36 +++
 .../interfaces/RelSchemaProviderFactory.java    |  33 +++
 .../samza/sql/interfaces/SamzaRelConverter.java |  46 ++++
 .../interfaces/SamzaRelConverterFactory.java    |  39 +++
 .../samza/sql/interfaces/SourceResolver.java    |  34 +++
 .../sql/interfaces/SourceResolverFactory.java   |  36 +++
 .../sql/interfaces/SqlSystemStreamConfig.java   |  74 ++++++
 .../samza/sql/interfaces/UdfMetadata.java       |  61 +++++
 .../samza/sql/interfaces/UdfResolver.java       |  35 +++
 .../org/apache/samza/sql/planner/Checker.java   |  93 +++++++
 .../apache/samza/sql/planner/QueryPlanner.java  | 153 ++++++++++++
 .../sql/planner/SamzaSqlOperatorTable.java      | 101 ++++++++
 .../sql/planner/SamzaSqlScalarFunctionImpl.java |  84 +++++++
 .../sql/planner/SamzaSqlUdfOperatorTable.java   |  62 +++++
 .../samza/sql/runner/SamzaSqlApplication.java   |  56 +++++
 .../sql/runner/SamzaSqlApplicationConfig.java   | 245 +++++++++++++++++++
 .../sql/runner/SamzaSqlApplicationRunner.java   | 133 ++++++++++
 .../apache/samza/sql/testutil/ConfigUtil.java   |  62 +++++
 .../org/apache/samza/sql/testutil/JsonUtil.java |  91 +++++++
 .../samza/sql/testutil/ReflectionUtils.java     |  62 +++++
 .../samza/sql/testutil/SamzaSqlQueryParser.java | 188 ++++++++++++++
 .../samza/sql/testutil/SqlFileParser.java       | 103 ++++++++
 .../samza/sql/translator/FilterTranslator.java  |  62 +++++
 .../samza/sql/translator/ProjectTranslator.java | 108 ++++++++
 .../samza/sql/translator/QueryTranslator.java   |  96 ++++++++
 .../samza/sql/translator/ScanTranslator.java    |  70 ++++++
 .../samza/sql/translator/TranslatorContext.java | 162 ++++++++++++
 .../apache/samza/sql/TestQueryTranslator.java   | 103 ++++++++
 .../sql/TestSamzaSqlApplicationConfig.java      |  92 +++++++
 .../samza/sql/TestSamzaSqlFileParser.java       |  58 +++++
 .../samza/sql/TestSamzaSqlQueryParser.java      |  70 ++++++
 .../samza/sql/TestSamzaSqlRelMessage.java       |  46 ++++
 .../samza/sql/avro/TestAvroRelConversion.java   | 239 ++++++++++++++++++
 .../samza/sql/avro/schemas/ComplexRecord.avsc   | 143 +++++++++++
 .../samza/sql/avro/schemas/ComplexRecord.java   |  92 +++++++
 .../apache/samza/sql/avro/schemas/MyFixed.java  |  29 +++
 .../samza/sql/avro/schemas/SimpleRecord.avsc    |  39 +++
 .../samza/sql/avro/schemas/SimpleRecord.java    |  52 ++++
 .../samza/sql/avro/schemas/SubRecord.java       |  53 ++++
 .../samza/sql/avro/schemas/TestEnumType.java    |  31 +++
 .../samza/sql/e2e/TestSamzaSqlEndToEnd.java     | 137 +++++++++++
 .../sql/system/ConsoleLoggingSystemFactory.java |  83 +++++++
 .../samza/sql/system/SimpleSystemAdmin.java     |  61 +++++
 .../samza/sql/system/TestAvroSystemFactory.java | 156 ++++++++++++
 .../samza/sql/testutil/MyTestArrayUdf.java      |  37 +++
 .../apache/samza/sql/testutil/MyTestUdf.java    |  45 ++++
 .../samza/sql/testutil/SamzaSqlTestConfig.java  | 103 ++++++++
 samza-sql/src/test/resources/log4j.xml          |  43 ++++
 settings.gradle                                 |   5 +-
 64 files changed, 5218 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index da08ee9..59ff5f2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -104,6 +104,12 @@ allprojects {
   }
 }
 
+idea {
+  project {
+    languageLevel = 1.8
+  }
+}
+
 subprojects {
   apply plugin: 'eclipse'
   apply plugin: 'project-report'
@@ -185,7 +191,6 @@ project(":samza-core_$scalaVersion") {
   }
 }
 
-
 project(':samza-azure') {
   apply plugin: 'java'
   apply plugin: 'checkstyle'
@@ -215,7 +220,6 @@ project(':samza-azure') {
   }
 }
 
-
 project(":samza-autoscaling_$scalaVersion") {
   apply plugin: 'scala'
   apply plugin: 'checkstyle'
@@ -269,7 +273,24 @@ project(':samza-elasticsearch') {
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
 
-    // Logging in tests is good.
+    testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
+  }
+}
+
+project(':samza-sql') {
+  apply plugin: 'java'
+
+  dependencies {
+    compile project(':samza-api')
+    compile project(":samza-kafka_$scalaVersion")
+    compile "org.apache.avro:avro:$avroVersion"
+    compile "org.apache.calcite:calcite-core:$calciteVersion"
+    compile "org.slf4j:slf4j-api:$slf4jVersion"
+
+    testCompile project(":samza-test_$scalaVersion")
+    testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
+
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
 }
@@ -449,7 +470,7 @@ project(":samza-shell") {
   }
 
   // Usage: ./gradlew samza-shell:runJob \
-  //    -PconfigPath=file:///path/to/job/config.properties
+  //  -PconfigPath=file:///path/to/job/config.properties
   task runJob(type:JavaExec) {
     description 'To run a job (defined in a properties file)'
     main = 'org.apache.samza.job.JobRunner'
@@ -459,7 +480,7 @@ project(":samza-shell") {
   }
 
   // Usage: ./gradlew samza-shell:checkpointTool \
-  //    -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties
+  //  -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties
   task checkpointTool(type:JavaExec) {
     description 'Command-line tool to inspect and manipulate the job’s checkpoint'
     main = 'org.apache.samza.checkpoint.CheckpointTool'
@@ -470,7 +491,7 @@ project(":samza-shell") {
   }
 
   // Usage: ./gradlew samza-shell:kvPerformanceTest
-  //    -PconfigPath=file:///path/to/job/config.properties
+  //  -PconfigPath=file:///path/to/job/config.properties
   task kvPerformanceTest(type:JavaExec) {
     description 'Command-line tool to run key-value performance tests'
     main = 'org.apache.samza.test.performance.TestKeyValuePerformance'

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index a8af9a8..4f467ab 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -18,6 +18,8 @@
  */
  ext {
   apacheCommonsCollections4Version = "4.0"
+  avroVersion = "1.7.0"
+  calciteVersion = "1.14.0"
   commonsCodecVersion = "1.9"
   commonsCollectionVersion = "3.2.1"
   commonsHttpClientVersion = "3.1"

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java b/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java
new file mode 100644
index 0000000..719cace
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java
@@ -0,0 +1,48 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.udfs;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * The base class for the Scalar UDFs. All the scalar UDF classes needs to extend this and implement a method named
+ * "execute". The number and type of arguments for the execute method in the UDF class should match the number and type of fields
+ * used while invoking this UDF in SQL statement.
+ * Say for e.g. User creates a UDF class with signature int execute(int var1, String var2). It can be used in a SQL query
+ *     select myudf(id, name) from profile
+ * In the above query, Profile should contain fields named 'id' of INTEGER/NUMBER type and 'name' of type VARCHAR/CHARACTER
+ */
+public interface ScalarUdf {
+  /**
+   * Udfs can implement this method to perform any initialization that they may need.
+   * @param udfConfig Config specific to the udf.
+   */
+  void init(Config udfConfig);
+
+  /**
+   * Actual implementation of the udf function
+   * @param args
+   *   list of all arguments that the udf needs
+   * @return
+   *   Return value from the scalar udf.
+   */
+  Object execute(Object... args);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
new file mode 100644
index 0000000..1c17295
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -0,0 +1,183 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class converts a Samza Avro messages to Relational messages and vice versa.
+ * This supports Samza messages where Key is a string and Value is an avro record.
+ *
+ * Conversion from Samza to Relational Message :
+ *     The key part of the samza message is represented as a special column {@link SamzaSqlRelMessage#KEY_NAME}
+ *     in relational message.
+ *
+ *     The value part of the samza message is expected to be {@link IndexedRecord}, All the fields in the IndexedRecord form
+ *     the corresponding fields of the relational message.
+ *
+ * Conversion from Relational to Samza Message :
+ *     This converts the Samza relational message into Avro {@link GenericRecord}.
+ *     All the fields of the relational message is become fields of the Avro GenericRecord except of the field with name
+ *     {@link SamzaSqlRelMessage#KEY_NAME}. This special field becomes the Key in the output Samza message.
+ */
+public class AvroRelConverter implements SamzaRelConverter {
+
+  protected final Config config;
+  private final Schema avroSchema;
+  private final RelDataType relationalSchema;
+
+  /**
+   * Class that converts the avro field to their corresponding relational fields
+   * Array fields are converted from Avro {@link org.apache.avro.generic.GenericData.Array} to {@link ArrayList}
+   */
+  public enum AvroToRelObjConverter {
+
+    /**
+     * If the relational field type is ArraySqlType, We expect the avro field to be of type either
+     * {@link GenericData.Array} or {@link List} which then is converted to Rel field of type {@link ArrayList}
+     */
+    ArraySqlType {
+      @Override
+      Object convert(Object avroObj) {
+        ArrayList<Object> retVal = new ArrayList<>();
+        if (avroObj != null) {
+          if (avroObj instanceof GenericData.Array) {
+            retVal.addAll(((GenericData.Array) avroObj));
+          } else if (avroObj instanceof List) {
+            retVal.addAll((List) avroObj);
+          }
+        }
+
+        return retVal;
+      }
+    },
+
+    /**
+     * If the relational field type is MapSqlType, We expect the avro field to be of type
+     * {@link Map}
+     */
+    MapSqlType {
+      @Override
+      Object convert(Object obj) {
+        Map<String, Object> retVal = new HashMap<>();
+        if (obj != null) {
+          retVal.putAll((Map<String, ?>) obj);
+        }
+        return retVal;
+      }
+    },
+
+    /**
+     * If the relational field type is RelRecordType, The field is considered an object
+     * and moved to rel field without any translation.
+     */
+    RelRecordType {
+      @Override
+      Object convert(Object obj) {
+        return obj;
+      }
+    },
+
+    /**
+     * If the relational field type is BasicSqlType, The field is moved to rel field without any translation.
+     */
+    BasicSqlType {
+      @Override
+      Object convert(Object obj) {
+        return obj;
+      }
+    };
+
+    abstract Object convert(Object obj);
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroRelConverter.class);
+
+  private final Schema arraySchema = Schema.parse(
+      "{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]},\"java-class\":\"java.util.List\"}");
+  private final Schema mapSchema = Schema.parse(
+      "{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]}}");
+
+
+  public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
+    this.config = config;
+    this.relationalSchema = schemaProvider.getRelationalSchema();
+    this.avroSchema = Schema.parse(schemaProvider.getSchema(systemStream));
+  }
+
+  @Override
+  public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> samzaMessage) {
+    List<Object> values = new ArrayList<>();
+    List<String> fieldNames = new ArrayList<>();
+    Object value = samzaMessage.getValue();
+    if (value instanceof IndexedRecord) {
+      IndexedRecord record = (IndexedRecord) value;
+      fieldNames.addAll(relationalSchema.getFieldNames());
+      values.addAll(relationalSchema.getFieldList()
+          .stream()
+          .map(x -> getRelField(x.getType(), record.get(this.avroSchema.getField(x.getName()).pos())))
+          .collect(Collectors.toList()));
+    } else if (value == null) {
+      fieldNames.addAll(relationalSchema.getFieldNames());
+      IntStream.range(0, fieldNames.size() - 1).forEach(x -> values.add(null));
+    } else {
+      String msg = "Avro message converter doesn't support messages of type " + value.getClass();
+      LOG.error(msg);
+      throw new SamzaException(msg);
+    }
+
+    return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, values);
+  }
+
+  @Override
+  public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
+    GenericRecord record = new GenericData.Record(this.avroSchema);
+    List<String> fieldNames = relMessage.getFieldNames();
+    List<Object> values = relMessage.getFieldValues();
+    for (int index = 0; index < fieldNames.size(); index++) {
+      record.put(fieldNames.get(index), values.get(index));
+    }
+
+    return new KV<>(relMessage.getKey(), record);
+  }
+
+  private Object getRelField(RelDataType relType, Object avroObj) {
+    return AvroToRelObjConverter.valueOf(relType.getClass().getSimpleName()).convert(avroObj);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java
new file mode 100644
index 0000000..278735f
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import java.util.HashMap;
+import java.util.Properties;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Avro Schema Resolver that uses static config to return a schema for a SystemStream.
+ * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ */
+public class AvroRelConverterFactory implements SamzaRelConverterFactory {
+
+  private final HashMap<SystemStream, SamzaRelConverter> relConverters = new HashMap<>();
+
+  @Override
+  public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider schemaProvider, Config config) {
+    return relConverters.computeIfAbsent(systemStream,
+        ss -> new AvroRelConverter(ss, (AvroRelSchemaProvider) schemaProvider, config));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
new file mode 100644
index 0000000..fb11624
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
@@ -0,0 +1,28 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.system.SystemStream;
+
+
+public interface AvroRelSchemaProvider extends RelSchemaProvider {
+  String getSchema(SystemStream systemStream);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
new file mode 100644
index 0000000..74e15e9
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -0,0 +1,132 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.ArraySqlType;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory that creates the Calcite relational types from the Avro Schema. This is used by the
+ * AvroRelConverter to convert the Avro schema to calcite relational schema.
+ */
+public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroTypeFactoryImpl.class);
+
+  public AvroTypeFactoryImpl() {
+    super(RelDataTypeSystem.DEFAULT);
+  }
+
+  public RelDataType createType(Schema schema) {
+    Schema.Type type = schema.getType();
+    if (type != Schema.Type.RECORD) {
+      String msg =
+          String.format("System supports only RECORD as top level avro type, But the Schema's type is %s", type);
+      LOG.error(msg);
+      throw new SamzaException(msg);
+    }
+
+    List<RelDataTypeField> relFields = getRelFields(schema.getFields());
+    return new RelRecordType(relFields);
+  }
+
+  private List<RelDataTypeField> getRelFields(List<Schema.Field> fields) {
+    List<RelDataTypeField> relFields = new ArrayList<>();
+
+    for (Schema.Field field : fields) {
+      String fieldName = field.name();
+      int fieldPos = field.pos() + 1;
+      RelDataType dataType = getRelDataType(field.schema());
+      relFields.add(new RelDataTypeFieldImpl(fieldName, fieldPos, dataType));
+    }
+
+    return relFields;
+  }
+
+  private RelDataType getRelDataType(Schema fieldSchema) {
+    switch (fieldSchema.getType()) {
+      case ARRAY:
+        // TODO JavaTypeFactoryImpl should convert Array into Array(ANY, ANY)
+        // return new ArraySqlType(createSqlType(SqlTypeName.ANY), true);
+        return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
+      case BOOLEAN:
+        return createTypeWithNullability(createSqlType(SqlTypeName.BOOLEAN), true);
+      case DOUBLE:
+        return createTypeWithNullability(createSqlType(SqlTypeName.DOUBLE), true);
+      case FLOAT:
+        return createTypeWithNullability(createSqlType(SqlTypeName.FLOAT), true);
+      case ENUM:
+        return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+      case UNION:
+        return getRelTypeFromUnionTypes(fieldSchema.getTypes());
+      case FIXED:
+        return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY), true);
+      case STRING:
+        return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+      case BYTES:
+        return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY), true);
+      case INT:
+        return createTypeWithNullability(createSqlType(SqlTypeName.INTEGER), true);
+      case LONG:
+        return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), true);
+      case RECORD:
+//        List<RelDataTypeField> relFields = getRelFields(fieldSchema);
+//        return new RelRecordType(relFields);
+        // TODO Calcite execution engine doesn't support record type yet.
+        return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
+      case MAP:
+        // JavaTypeFactoryImpl converts map into Map(ANY, ANY)
+        return super.createMapType(createTypeWithNullability(createSqlType(SqlTypeName.ANY), true),
+            createTypeWithNullability(createSqlType(SqlTypeName.ANY), true));
+      default:
+        String msg = String.format("Field Type %s is not supported", fieldSchema.getType());
+        LOG.error(msg);
+        throw new SamzaException(msg);
+    }
+  }
+
+  private RelDataType getRelTypeFromUnionTypes(List<Schema> types) {
+    // Typically a nullable field's schema is configured as an union of Null and a Type.
+    // This is to check whether the Union is a Nullable field
+    if (types.size() == 2) {
+      if (types.get(0).getType() == Schema.Type.NULL) {
+        return getRelDataType(types.get(1));
+      } else if ((types.get(1).getType() == Schema.Type.NULL)) {
+        return getRelDataType(types.get(0));
+      }
+    }
+
+    return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java
new file mode 100644
index 0000000..4397a75
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Avro Schema Resolver that uses static config to return a schema for a SystemStream.
+ * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ */
+public class ConfigBasedAvroRelSchemaProviderFactory implements RelSchemaProviderFactory {
+
+  public static final String CFG_SOURCE_SCHEMA = "%s.%s.schema";
+
+  public RelSchemaProvider create(SystemStream systemStream, Config config) {
+    return new ConfigBasedAvroRelSchemaProvider(systemStream, config);
+  }
+
+  public static class ConfigBasedAvroRelSchemaProvider implements AvroRelSchemaProvider {
+    private final Config config;
+    private final SystemStream systemStream;
+
+    public ConfigBasedAvroRelSchemaProvider(SystemStream systemStream, Config config) {
+      this.systemStream = systemStream;
+      this.config = config;
+    }
+
+    public RelDataType getRelationalSchema() {
+      String schemaStr = getSchema(systemStream);
+      Schema schema = Schema.parse(schemaStr);
+      AvroTypeFactoryImpl avroTypeFactory = new AvroTypeFactoryImpl();
+      return avroTypeFactory.createType(schema);
+    }
+
+    @Override
+    public String getSchema(SystemStream systemStream) {
+      return config.get(String.format(CFG_SOURCE_SCHEMA, systemStream.getSystem(), systemStream.getStream()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java b/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java
new file mode 100644
index 0000000..9386e31
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java
@@ -0,0 +1,38 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import org.apache.calcite.DataContext;
+
+
+/**
+ * {@link RexToJavaCompiler} creates the Java code for each relational expression at runtime.
+ * This is the interface which the runtime generated java code for each Relational expression should implement.
+ */
+public interface Expression {
+  /**
+   * This method is used to implement the expressions that takes in columns as input and returns multiple values.
+   * @param context the context
+   * @param root the root
+   * @param inputValues All the relational columns for the particular row
+   * @param results the results Result values after executing the java code corresponding to the relational expression.
+   */
+  void execute(SamzaSqlExecutionContext context, DataContext root, Object[] inputValues, Object[] results);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
new file mode 100644
index 0000000..21c81a8
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
@@ -0,0 +1,224 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.List;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.BlockStatement;
+import org.apache.calcite.linq4j.tree.ClassDeclaration;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MemberDeclaration;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.Pair;
+import org.apache.samza.SamzaException;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.commons.compiler.CompilerFactoryFactory;
+import org.codehaus.commons.compiler.IClassBodyEvaluator;
+import org.codehaus.commons.compiler.ICompilerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Defines a SQL row expression to a java class ({@link org.apache.samza.sql.data.Expression}) compiler.
+ *
+ * <p>This is based on Calcite's {@link org.apache.calcite.interpreter.JaninoRexCompiler}. This first generates
+ * a Java AST and them compile it to a class using Janino.</p>
+ */
+public class RexToJavaCompiler {
+  private static final Logger log = LoggerFactory.getLogger(RexToJavaCompiler.class);
+
+  private final RexBuilder rexBuilder;
+
+  public RexToJavaCompiler(RexBuilder rexBuilder) {
+    this.rexBuilder = rexBuilder;
+  }
+
+  /**
+   * Compiles a relational expression to a instance of {@link Expression}
+   *
+   * for e.g.
+   *    Query : select id from profile
+   *      where profile table has relational schema with id(NUMBER) and name(VARCHAR) columns.
+   *    This query will result in the following relational plan
+   *      LogicalProject(id=[$1])
+   *        LogicalTableScan(table=[[profile]])
+   *
+   *    And the corresponding expressions are
+   *       inputs : EnumerableTableScan (Which is the output of LogicalTableScan)
+   *       nodes : [$1] Which essentially means take pick the first column from the input
+   *
+   *
+   *    This function converts the LogicalProject expression "[$1]" with input RexNode which is an output of TableScan
+   *    to a java code that implements the interface {@link Expression}
+   *
+   * @param inputs Input relations/time-varying relations for this row expression
+   * @param nodes relational expressions that needs to be converted to java code.
+   * @return compiled expression of type {@link org.apache.samza.sql.data.Expression}
+   */
+  public org.apache.samza.sql.data.Expression compile(List<RelNode> inputs, List<RexNode> nodes) {
+    /*
+     *  In case there are multiple input relations, we build a single input row type combining types of all the inputs.
+     */
+    final RelDataTypeFactory.FieldInfoBuilder fieldBuilder = rexBuilder.getTypeFactory().builder();
+    for (RelNode input : inputs) {
+      fieldBuilder.addAll(input.getRowType().getFieldList());
+    }
+    final RelDataType inputRowType = fieldBuilder.build();
+    final RexProgramBuilder programBuilder = new RexProgramBuilder(inputRowType, rexBuilder);
+    for (RexNode node : nodes) {
+      programBuilder.addProject(node, null);
+    }
+    final RexProgram program = programBuilder.getProgram();
+
+    final BlockBuilder builder = new BlockBuilder();
+    final ParameterExpression executionContext = Expressions.parameter(SamzaSqlExecutionContext.class, "context");
+    final ParameterExpression root = DataContext.ROOT;
+    final ParameterExpression inputValues = Expressions.parameter(Object[].class, "inputValues");
+    final ParameterExpression outputValues = Expressions.parameter(Object[].class, "outputValues");
+    final JavaTypeFactoryImpl javaTypeFactory = new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+
+    // public void execute(Object[] inputValues, Object[] outputValues)
+    final RexToLixTranslator.InputGetter inputGetter = new RexToLixTranslator.InputGetterImpl(ImmutableList.of(
+        Pair.<org.apache.calcite.linq4j.tree.Expression, PhysType>of(
+            Expressions.variable(Object[].class, "inputValues"),
+            PhysTypeImpl.of(javaTypeFactory, inputRowType, JavaRowFormat.ARRAY, false))));
+
+    final List<org.apache.calcite.linq4j.tree.Expression> list =
+        RexToLixTranslator.translateProjects(program, javaTypeFactory, builder, null, DataContext.ROOT, inputGetter,
+            null);
+    for (int i = 0; i < list.size(); i++) {
+      builder.add(Expressions.statement(
+          Expressions.assign(Expressions.arrayIndex(outputValues, Expressions.constant(i)), list.get(i))));
+    }
+    return createSamzaExpressionFromCalcite(executionContext, root, inputValues, outputValues, builder.toBlock());
+  }
+
+  /**
+   * This method takes the java statement block, inputs, outputs needed by the statement block to create an object
+   * of class that implements the interface {@link Expression}
+   *
+   * for e.g.
+   *   Query : select id from profile
+   *      where profile table has relational schema with id(NUMBER) and name(VARCHAR) columns.
+   *    This query will result in the following relational plan
+   *      LogicalProject(id=[$1])
+   *        LogicalTableScan(table=[[profile]])
+   *
+   *
+   *    And the corresponding expressions are
+   *       inputs : EnumerableTableScan (Which is the output of LogicalTableScan)
+   *       nodes : [$1] Which essentially means take pick the first column from the input
+   *
+   *    This expression corresponding to the logicalProject "[$1]" gets converted into a java statement block
+   *    {
+   *      outputValues[0] = (Integer) inputValues[1];
+   *    }
+   *
+   *    This method converts this statement block into an equivalent {@link Expression} object whose execute methods
+   *    execute the above java statement block
+   *
+   */
+  static org.apache.samza.sql.data.Expression createSamzaExpressionFromCalcite(ParameterExpression executionContext,
+      ParameterExpression dataContext, ParameterExpression inputValues, ParameterExpression outputValues,
+      BlockStatement block) {
+    final List<MemberDeclaration> declarations = Lists.newArrayList();
+
+    // public void execute(Object[] inputValues, Object[] outputValues)
+    declarations.add(
+        Expressions.methodDecl(Modifier.PUBLIC, void.class, SamzaBuiltInMethod.EXPR_EXECUTE2.method.getName(),
+            ImmutableList.of(executionContext, dataContext, inputValues, outputValues), block));
+
+    final ClassDeclaration classDeclaration = Expressions.classDecl(Modifier.PUBLIC, "SqlExpression", null,
+        ImmutableList.<Type>of(org.apache.samza.sql.data.Expression.class), declarations);
+    String s = Expressions.toString(declarations, "\n", false);
+
+    log.info("Generated code for expression: {}", s);
+
+    try {
+      return getExpression(classDeclaration, s);
+    } catch (Exception e) {
+      throw new SamzaException("Expression compilation failure.", e);
+    }
+  }
+
+  /**
+   * Creates the instance of the class defined in {@link ClassDeclaration}
+   * @param expr Interface whose instance needs to be created.
+   * @param s The java code that implements the interface which should be used to create the instance.
+   * @return The object of the class which implements the interface {@link Expression} with the code that is passed as input.
+   * @throws CompileException
+   * @throws IOException
+   */
+  static Expression getExpression(ClassDeclaration expr, String s) throws CompileException, IOException {
+    ICompilerFactory compilerFactory;
+    try {
+      compilerFactory = CompilerFactoryFactory.getDefaultCompilerFactory();
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to instantiate java compiler", e);
+    }
+    IClassBodyEvaluator cbe = compilerFactory.newClassBodyEvaluator();
+    cbe.setClassName(expr.name);
+    cbe.setImplementedInterfaces(expr.implemented.toArray(new Class[expr.implemented.size()]));
+    cbe.setParentClassLoader(RexToJavaCompiler.class.getClassLoader());
+    cbe.setDebuggingInformation(true, true, true);
+
+    return (org.apache.samza.sql.data.Expression) cbe.createInstance(new StringReader(s));
+  }
+
+  /**
+   * Represents the methods in the class {@link Expression}
+   */
+  public enum SamzaBuiltInMethod {
+    EXPR_EXECUTE2(org.apache.samza.sql.data.Expression.class, "execute", SamzaSqlExecutionContext.class,
+        DataContext.class, Object[].class, Object[].class);
+
+    public final Method method;
+
+    /**
+     * Defines a method.
+     */
+    SamzaBuiltInMethod(Class clazz, String methodName, Class... argumentTypes) {
+      this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
new file mode 100644
index 0000000..88bcb61
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.UdfMetadata;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+public class SamzaSqlExecutionContext {
+
+  private final SamzaSqlApplicationConfig sqlConfig;
+  private final Map<String, UdfMetadata> udfMetadata;
+  private final Map<String, ScalarUdf> udfInstances = new HashMap<>();
+
+  public SamzaSqlExecutionContext(SamzaSqlApplicationConfig config) {
+    this.sqlConfig = config;
+    udfMetadata =
+        this.sqlConfig.getUdfMetadata().stream().collect(Collectors.toMap(UdfMetadata::getName, Function.identity()));
+  }
+
+  public ScalarUdf getOrCreateUdf(String clazz, String udfName) {
+    return udfInstances.computeIfAbsent(udfName, s -> createInstance(clazz, udfName));
+  }
+
+  public ScalarUdf createInstance(String clazz, String udfName) {
+    Config udfConfig = udfMetadata.get(udfName).getUdfConfig();
+    ScalarUdf scalarUdf = ReflectionUtils.createInstance(clazz);
+    if (scalarUdf == null) {
+      String msg = String.format("Couldn't create udf %s of class %s", udfName, clazz);
+      throw new SamzaException(msg);
+    }
+    scalarUdf.init(udfConfig);
+    return scalarUdf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
new file mode 100644
index 0000000..b5df545
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -0,0 +1,123 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+
+/**
+ * Samza sql relational message. Each Samza sql relational message represents a relational row in a table.
+ * Each row of the relational table and hence SamzaSqlRelMessage consists of list of column values and
+ * their associated column names. Right now we donot store any other metadata other than the column name in the
+ * SamzaSqlRelationalMessage, In future if we find a need, we could add additional column ddl metadata around
+ * primary Key, nullability, etc.
+ */
+public class SamzaSqlRelMessage {
+
+  public static final String KEY_NAME = "__key__";
+
+  private final List<Object> value = new ArrayList<>();
+  private final List<Object> relFieldValues = new ArrayList<>();
+  private final List<String> names = new ArrayList<>();
+  private final Object key;
+
+  /**
+   * Create the SamzaSqlRelMessage, Each rel message represents a row in the table.
+   * So it can contain a key and a list of fields in the row.
+   * @param key Represents the key in the row, Key is optional, in which case it can be null.
+   * @param names Ordered list of field names in the row.
+   * @param values Ordered list of all the values in the row. Since the samzaSqlRelMessage can represent
+   *               the row in a change capture event stream, It can contain delete messages in which case
+   *               all the fields in the row can be null.
+   */
+  public SamzaSqlRelMessage(Object key, List<String> names, List<Object> values) {
+    this.key = key;
+    this.value.addAll(values);
+    this.names.addAll(names);
+    if (key != null) {
+      this.relFieldValues.add(key);
+    }
+    this.relFieldValues.addAll(values);
+  }
+
+  /**
+   * Get the field names of all the columns in the relational message.
+   * @return the field names of all columns.
+   */
+  public List<String> getFieldNames() {
+    return names;
+  }
+
+  /**
+   * Get the values of all the columns in the relational message.
+   * @return the values of all the columns
+   */
+  public List<Object> getFieldValues() {
+    return value;
+  }
+
+  public List<Object> getRelFieldValues() {
+    return this.relFieldValues;
+  }
+
+  public Object getKey() {
+    return key;
+  }
+
+  /**
+   * Get the value of the field corresponding to the field name.
+   * @param name Name of the field.
+   * @return returns the value of the field.
+   */
+  public Optional<Object> getField(String name) {
+    for (int index = 0; index < names.size(); index++) {
+      if (names.get(index).equals(name)) {
+        return Optional.ofNullable(value.get(index));
+      }
+    }
+
+    return Optional.empty();
+  }
+
+  /**
+   * Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values.
+   * If the field list contains KEY, then it extracts the key out of the fields to create the
+   * RelMessage with key and values.
+   * @param fieldValues Field values that can include the key as well.
+   * @param fieldNames Field names in the rel message that can include the special __key__
+   * @return Created SamzaSqlRelMessage.
+   */
+  public static SamzaSqlRelMessage createRelMessage(List<Object> fieldValues, List<String> fieldNames) {
+    int keyIndex = fieldNames.indexOf(KEY_NAME);
+    fieldNames = new ArrayList<>(fieldNames);
+    fieldValues = new ArrayList<>(fieldValues);
+    Object key = null;
+    if (keyIndex != -1) {
+      key = fieldValues.get(keyIndex);
+      fieldValues.remove(keyIndex);
+      fieldNames.remove(keyIndex);
+    }
+
+    return new SamzaSqlRelMessage(key, fieldNames, fieldValues);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
new file mode 100644
index 0000000..39a26ce
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import java.util.List;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+public class FlattenUdf implements ScalarUdf {
+  @Override
+  public void init(Config udfConfig) {
+  }
+
+  public Object execute(Object... arg) {
+    List value = (List) arg[0];
+    return value != null && !value.isEmpty() ? value.get(0) : value;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
new file mode 100644
index 0000000..1203b25
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
@@ -0,0 +1,71 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.impl;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.sql.interfaces.SourceResolverFactory;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Schema Resolver that uses static config to return a config corresponding to a system stream.
+ * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ */
+public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConfigBasedSourceResolverFactory.class);
+
+  public static final String CFG_FMT_SAMZA_PREFIX = "systems.%s.";
+
+  @Override
+  public SourceResolver create(Config config) {
+    return new ConfigBasedSourceResolver(config);
+  }
+
+  private class ConfigBasedSourceResolver implements SourceResolver {
+    private final Config config;
+
+    public ConfigBasedSourceResolver(Config config) {
+      this.config = config;
+    }
+
+    @Override
+    public SqlSystemStreamConfig fetchSourceInfo(String source) {
+      String[] sourceComponents = source.split("\\.");
+      if (sourceComponents.length != 2) {
+        String msg = String.format("Source %s is not of the format <system>.<stream>", source);
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+      String systemName = sourceComponents[0];
+      String streamName = sourceComponents[1];
+
+      return new SqlSystemStreamConfig(systemName, streamName, fetchSystemConfigs(systemName));
+    }
+
+    private Config fetchSystemConfigs(String systemName) {
+      return config.subset(systemName + ".");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
new file mode 100644
index 0000000..c98a4a1
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.impl;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.UdfMetadata;
+import org.apache.samza.sql.interfaces.UdfResolver;
+import org.apache.samza.sql.udfs.ScalarUdf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Udf resolver that uses static config to return the UDFs present in the Samza SQL application
+ * All the UDF classes are provided to this factory as a comma separated list of values for the config named
+ * "udfClasses".
+ * This factory loads all the udf classes that are configured, performs the validation to ensure that they extend
+ * {@link ScalarUdf} and implement the method named "execute"
+ */
+public class ConfigBasedUdfResolver implements UdfResolver {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConfigBasedUdfResolver.class);
+  public static final String CFG_UDF_CLASSES = "udfClasses";
+  public static final String UDF_METHOD_NAME = "execute";
+
+  private final ArrayList<UdfMetadata> udfs;
+
+  public ConfigBasedUdfResolver(Properties config, Config udfConfig) {
+    List<String> udfClasses = Arrays.stream(config.getProperty(CFG_UDF_CLASSES, "").split(","))
+        .filter(StringUtils::isNotBlank)
+        .collect(Collectors.toList());
+    udfs = new ArrayList<>();
+    Class<?> udfClass;
+    for (String udfClassName : udfClasses) {
+      try {
+        udfClass = Class.forName(udfClassName);
+      } catch (ClassNotFoundException e) {
+        String msg = String.format("Couldn't load the udf class %s", udfClassName);
+        LOG.error(msg, e);
+        throw new SamzaException(msg, e);
+      }
+
+      if (!ScalarUdf.class.isAssignableFrom(udfClass)) {
+        String msg = String.format("Udf class %s is not extended from %s", udfClassName, ScalarUdf.class.getName());
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+
+      Optional<Method> udfMethod =
+          Arrays.stream(udfClass.getMethods()).filter(x -> x.getName().equals(UDF_METHOD_NAME)).findFirst();
+
+      if (!udfMethod.isPresent()) {
+        String msg = String.format("Udf Class %s doesn't implement method named %s", udfClassName, UDF_METHOD_NAME);
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+
+      int udfIndex = udfClass.getSimpleName().toLowerCase().lastIndexOf("udf");
+      String udfName = udfClass.getSimpleName().substring(0, udfIndex);
+
+      udfs.add(new UdfMetadata(udfName, udfMethod.get(), udfConfig.subset(udfName + ".")));
+    }
+  }
+
+  @Override
+  public Collection<UdfMetadata> getUdfs() {
+    return udfs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java
new file mode 100644
index 0000000..ee95224
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.calcite.rel.type.RelDataType;
+
+
+/**
+ * Samza SQL application uses {@link RelSchemaProvider} to get the relational schema corresponding to the system
+ * and stream.
+ */
+public interface RelSchemaProvider {
+  /**
+   * Converts the schema to relational schema. It is possible that there can be multiple tables associated
+   * within a single schema.
+   * @return Relational schema corresponding to the system stream.
+   */
+  RelDataType getRelationalSchema();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
new file mode 100644
index 0000000..c614cdf
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+public interface RelSchemaProviderFactory {
+
+  /**
+   * Create a {@link RelSchemaProvider} given the config
+   * @param config Config needed to create the {@link RelSchemaProvider}
+   * @return {@link RelSchemaProvider} object created.
+   */
+  RelSchemaProvider create(SystemStream systemStream, Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
new file mode 100644
index 0000000..12d5f28
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
@@ -0,0 +1,46 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+
+
+/**
+ * Samza SQL application uses {@link SamzaRelConverter} to convert the Samza messages to relational message before
+ * it can be processed by the calcite engine.
+ * The {@link SamzaRelConverter} is configurable at a system level, So it is possible to configure different
+ * {@link SamzaRelConverter} for different systems.
+ */
+public interface SamzaRelConverter {
+  /**
+   * Converts the object to relational message corresponding to the tableName with relational schema.
+   * @param message samza message that needs to be converted.
+   * @return Relational message extracted from the object.
+   */
+  SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> message);
+
+  /**
+   * Convert the relational message to the output message.
+   * @param relMessage relational message that needs to be converted.
+   * @return the key and value of the Samza message
+   */
+  KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
new file mode 100644
index 0000000..f239df6
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Factory that is used to create {@link SamzaRelConverter}
+ */
+public interface SamzaRelConverterFactory {
+
+  /**
+   * Create a {@link SamzaRelConverter}. This method is called when the framework wants to create the
+   * {@link SamzaRelConverter} corresponding to the system.
+   * @param config
+   *  config that is used to create the object
+   * @return Returns the object created.
+   */
+  SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
new file mode 100644
index 0000000..ac3fd31
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
@@ -0,0 +1,34 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+/**
+ * Source Resolvers are used by Samza Sql application to fetch the {@link SqlSystemStreamConfig} corresponding to the source.
+ */
+public interface SourceResolver {
+  /**
+   * Returns the SystemStream config corresponding to the source name
+   * @param sourceName
+   *  source whose systemstreamconfig needs to be fetched.
+   * @return
+   *  System stream config corresponding to the source.
+   */
+  SqlSystemStreamConfig fetchSourceInfo(String sourceName);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
new file mode 100644
index 0000000..274a6b1
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory that is used to create {@link SourceResolver}.
+ */
+public interface SourceResolverFactory {
+
+  /**
+   * Create the {@link SourceResolver}. This is called during the application initialization.
+   * @param config config for the SourceResolver
+   * @return Returns the created {@link SourceResolver}
+   */
+  SourceResolver create(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
new file mode 100644
index 0000000..df21784
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
@@ -0,0 +1,74 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Configs associated with a system stream.
+ */
+public class SqlSystemStreamConfig {
+
+  public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
+  public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
+
+  private final String systemName;
+
+  private final String streamName;
+
+  private final String samzaRelConverterName;
+  private final SystemStream systemStream;
+  private String relSchemaProviderName;
+
+  public SqlSystemStreamConfig(String systemName, String streamName, Config systemConfig) {
+
+    this.systemName = systemName;
+    this.streamName = streamName;
+    this.systemStream = new SystemStream(systemName, streamName);
+
+    samzaRelConverterName = systemConfig.get(CFG_SAMZA_REL_CONVERTER);
+    relSchemaProviderName = systemConfig.get(CFG_REL_SCHEMA_PROVIDER);
+    Validate.notEmpty(samzaRelConverterName,
+        String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+  }
+
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public String getSamzaRelConverterName() {
+    return samzaRelConverterName;
+  }
+
+  public String getRelSchemaProviderName() {
+    return relSchemaProviderName;
+  }
+
+  public SystemStream getSystemStream() {
+    return systemStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
new file mode 100644
index 0000000..b1a2d6d
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import java.lang.reflect.Method;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Metadata corresponding to the Udf
+ */
+public class UdfMetadata {
+
+  private final String name;
+
+  private final Method udfMethod;
+
+  private final Config udfConfig;
+
+  public UdfMetadata(String name, Method udfMethod, Config udfConfig) {
+    this.name = name;
+    this.udfMethod = udfMethod;
+    this.udfConfig = udfConfig;
+  }
+
+  public Config getUdfConfig() {
+    return udfConfig;
+  }
+
+  /**
+   * @return Returns the instance of the {@link Method} corresponding to the UDF.
+   */
+  public Method getUdfMethod() {
+    return udfMethod;
+  }
+
+  /**
+   * @return Returns the name of the Udf.
+   */
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java
new file mode 100644
index 0000000..7e84118
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java
@@ -0,0 +1,35 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import java.util.Collection;
+
+/**
+ * Plugin that is used to discover the UDFs available for the Samza SQL.
+ */
+public interface UdfResolver {
+
+  /**
+   * This method is called by the framework during initialization to fetch the {@link UdfMetadata} for all the UDFs
+   * available in the system.
+   * @return {@link UdfMetadata} for all the Udfs available in the system.
+   */
+  Collection<UdfMetadata> getUdfs();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
new file mode 100644
index 0000000..851c895
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
@@ -0,0 +1,93 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.util.Map;
+
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.collect.Maps;
+
+
+class Checker implements SqlOperandTypeChecker {
+  private SqlOperandCountRange range;
+
+  public static final Checker ANY_CHECKER = new Checker();
+  private static final Map<Pair<Integer, Integer>, Checker> checkerMap = Maps.newHashMap();
+
+  public static Checker getChecker(int min, int max) {
+    final Pair<Integer, Integer> range = Pair.of(min, max);
+    if (checkerMap.containsKey(range)) {
+      return checkerMap.get(range);
+    }
+
+    final Checker newChecker;
+    if (min == max) {
+      newChecker = new Checker(min);
+    } else {
+      newChecker = new Checker(min, max);
+    }
+
+    checkerMap.put(range, newChecker);
+    return newChecker;
+  }
+
+  private Checker(int size) {
+    range = SqlOperandCountRanges.of(size);
+  }
+
+  private Checker(int min, int max) {
+    range = SqlOperandCountRanges.between(min, max);
+  }
+
+  private Checker() {
+    range = SqlOperandCountRanges.any();
+  }
+
+  @Override
+  public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+    return true;
+  }
+
+  @Override
+  public SqlOperandCountRange getOperandCountRange() {
+    return range;
+  }
+
+  @Override
+  public String getAllowedSignatures(SqlOperator op, String opName) {
+    return opName + "(Drill - Opaque)";
+  }
+
+  @Override
+  public Consistency getConsistency() {
+    return Consistency.NONE;
+  }
+
+  @Override
+  public boolean isOptional(int i) {
+    return false;
+  }
+}
\ No newline at end of file