You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/16 19:15:02 UTC

[3/3] samza git commit: Samza SQL implementation for basic projects, filtering and UDFs

Samza SQL implementation for basic projects, filtering and UDFs

## Samza SQL implementation for basic projects, filtering and

## Design document:
https://docs.google.com/document/d/1bE-ZuPfTpntm1hT3GwQEShYDiTqU3IkxeP4-3ZcGHgU/edit?usp=sharing

Author: Srinivasulu Punuru <sp...@linkedin.com>

Reviewers: Yi Pan <ni...@gmail.com>

Closes #295 from srinipunuru/samza-sql.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9fa8beed
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9fa8beed
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9fa8beed

Branch: refs/heads/master
Commit: 9fa8beed7ac37c33a0c01412b075a9c9a0b47441
Parents: 2e2e00e
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Thu Nov 16 11:14:32 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Nov 16 11:14:32 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  33 ++-
 gradle/dependency-versions.gradle               |   2 +
 .../org/apache/samza/sql/udfs/ScalarUdf.java    |  48 ++++
 .../apache/samza/sql/avro/AvroRelConverter.java | 183 ++++++++++++++
 .../samza/sql/avro/AvroRelConverterFactory.java |  44 ++++
 .../samza/sql/avro/AvroRelSchemaProvider.java   |  28 +++
 .../samza/sql/avro/AvroTypeFactoryImpl.java     | 132 ++++++++++
 ...ConfigBasedAvroRelSchemaProviderFactory.java |  63 +++++
 .../org/apache/samza/sql/data/Expression.java   |  38 +++
 .../samza/sql/data/RexToJavaCompiler.java       | 224 +++++++++++++++++
 .../sql/data/SamzaSqlExecutionContext.java      |  61 +++++
 .../samza/sql/data/SamzaSqlRelMessage.java      | 123 ++++++++++
 .../org/apache/samza/sql/fn/FlattenUdf.java     |  36 +++
 .../impl/ConfigBasedSourceResolverFactory.java  |  71 ++++++
 .../samza/sql/impl/ConfigBasedUdfResolver.java  |  97 ++++++++
 .../samza/sql/interfaces/RelSchemaProvider.java |  36 +++
 .../interfaces/RelSchemaProviderFactory.java    |  33 +++
 .../samza/sql/interfaces/SamzaRelConverter.java |  46 ++++
 .../interfaces/SamzaRelConverterFactory.java    |  39 +++
 .../samza/sql/interfaces/SourceResolver.java    |  34 +++
 .../sql/interfaces/SourceResolverFactory.java   |  36 +++
 .../sql/interfaces/SqlSystemStreamConfig.java   |  74 ++++++
 .../samza/sql/interfaces/UdfMetadata.java       |  61 +++++
 .../samza/sql/interfaces/UdfResolver.java       |  35 +++
 .../org/apache/samza/sql/planner/Checker.java   |  93 +++++++
 .../apache/samza/sql/planner/QueryPlanner.java  | 153 ++++++++++++
 .../sql/planner/SamzaSqlOperatorTable.java      | 101 ++++++++
 .../sql/planner/SamzaSqlScalarFunctionImpl.java |  84 +++++++
 .../sql/planner/SamzaSqlUdfOperatorTable.java   |  62 +++++
 .../samza/sql/runner/SamzaSqlApplication.java   |  56 +++++
 .../sql/runner/SamzaSqlApplicationConfig.java   | 245 +++++++++++++++++++
 .../sql/runner/SamzaSqlApplicationRunner.java   | 133 ++++++++++
 .../apache/samza/sql/testutil/ConfigUtil.java   |  62 +++++
 .../org/apache/samza/sql/testutil/JsonUtil.java |  91 +++++++
 .../samza/sql/testutil/ReflectionUtils.java     |  62 +++++
 .../samza/sql/testutil/SamzaSqlQueryParser.java | 188 ++++++++++++++
 .../samza/sql/testutil/SqlFileParser.java       | 103 ++++++++
 .../samza/sql/translator/FilterTranslator.java  |  62 +++++
 .../samza/sql/translator/ProjectTranslator.java | 108 ++++++++
 .../samza/sql/translator/QueryTranslator.java   |  96 ++++++++
 .../samza/sql/translator/ScanTranslator.java    |  70 ++++++
 .../samza/sql/translator/TranslatorContext.java | 162 ++++++++++++
 .../apache/samza/sql/TestQueryTranslator.java   | 103 ++++++++
 .../sql/TestSamzaSqlApplicationConfig.java      |  92 +++++++
 .../samza/sql/TestSamzaSqlFileParser.java       |  58 +++++
 .../samza/sql/TestSamzaSqlQueryParser.java      |  70 ++++++
 .../samza/sql/TestSamzaSqlRelMessage.java       |  46 ++++
 .../samza/sql/avro/TestAvroRelConversion.java   | 239 ++++++++++++++++++
 .../samza/sql/avro/schemas/ComplexRecord.avsc   | 143 +++++++++++
 .../samza/sql/avro/schemas/ComplexRecord.java   |  92 +++++++
 .../apache/samza/sql/avro/schemas/MyFixed.java  |  29 +++
 .../samza/sql/avro/schemas/SimpleRecord.avsc    |  39 +++
 .../samza/sql/avro/schemas/SimpleRecord.java    |  52 ++++
 .../samza/sql/avro/schemas/SubRecord.java       |  53 ++++
 .../samza/sql/avro/schemas/TestEnumType.java    |  31 +++
 .../samza/sql/e2e/TestSamzaSqlEndToEnd.java     | 137 +++++++++++
 .../sql/system/ConsoleLoggingSystemFactory.java |  83 +++++++
 .../samza/sql/system/SimpleSystemAdmin.java     |  61 +++++
 .../samza/sql/system/TestAvroSystemFactory.java | 156 ++++++++++++
 .../samza/sql/testutil/MyTestArrayUdf.java      |  37 +++
 .../apache/samza/sql/testutil/MyTestUdf.java    |  45 ++++
 .../samza/sql/testutil/SamzaSqlTestConfig.java  | 103 ++++++++
 samza-sql/src/test/resources/log4j.xml          |  43 ++++
 settings.gradle                                 |   5 +-
 64 files changed, 5218 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index da08ee9..59ff5f2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -104,6 +104,12 @@ allprojects {
   }
 }
 
+idea {
+  project {
+    languageLevel = 1.8
+  }
+}
+
 subprojects {
   apply plugin: 'eclipse'
   apply plugin: 'project-report'
@@ -185,7 +191,6 @@ project(":samza-core_$scalaVersion") {
   }
 }
 
-
 project(':samza-azure') {
   apply plugin: 'java'
   apply plugin: 'checkstyle'
@@ -215,7 +220,6 @@ project(':samza-azure') {
   }
 }
 
-
 project(":samza-autoscaling_$scalaVersion") {
   apply plugin: 'scala'
   apply plugin: 'checkstyle'
@@ -269,7 +273,24 @@ project(':samza-elasticsearch') {
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
 
-    // Logging in tests is good.
+    testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
+  }
+}
+
+project(':samza-sql') {
+  apply plugin: 'java'
+
+  dependencies {
+    compile project(':samza-api')
+    compile project(":samza-kafka_$scalaVersion")
+    compile "org.apache.avro:avro:$avroVersion"
+    compile "org.apache.calcite:calcite-core:$calciteVersion"
+    compile "org.slf4j:slf4j-api:$slf4jVersion"
+
+    testCompile project(":samza-test_$scalaVersion")
+    testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
+
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
 }
@@ -449,7 +470,7 @@ project(":samza-shell") {
   }
 
   // Usage: ./gradlew samza-shell:runJob \
-  //    -PconfigPath=file:///path/to/job/config.properties
+  //  -PconfigPath=file:///path/to/job/config.properties
   task runJob(type:JavaExec) {
     description 'To run a job (defined in a properties file)'
     main = 'org.apache.samza.job.JobRunner'
@@ -459,7 +480,7 @@ project(":samza-shell") {
   }
 
   // Usage: ./gradlew samza-shell:checkpointTool \
-  //    -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties
+  //  -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties
   task checkpointTool(type:JavaExec) {
     description 'Command-line tool to inspect and manipulate the job’s checkpoint'
     main = 'org.apache.samza.checkpoint.CheckpointTool'
@@ -470,7 +491,7 @@ project(":samza-shell") {
   }
 
   // Usage: ./gradlew samza-shell:kvPerformanceTest
-  //    -PconfigPath=file:///path/to/job/config.properties
+  //  -PconfigPath=file:///path/to/job/config.properties
   task kvPerformanceTest(type:JavaExec) {
     description 'Command-line tool to run key-value performance tests'
     main = 'org.apache.samza.test.performance.TestKeyValuePerformance'

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index a8af9a8..4f467ab 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -18,6 +18,8 @@
  */
  ext {
   apacheCommonsCollections4Version = "4.0"
+  avroVersion = "1.7.0"
+  calciteVersion = "1.14.0"
   commonsCodecVersion = "1.9"
   commonsCollectionVersion = "3.2.1"
   commonsHttpClientVersion = "3.1"

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java b/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java
new file mode 100644
index 0000000..719cace
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java
@@ -0,0 +1,48 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.udfs;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * The base class for the Scalar UDFs. All the scalar UDF classes needs to extend this and implement a method named
+ * "execute". The number and type of arguments for the execute method in the UDF class should match the number and type of fields
+ * used while invoking this UDF in SQL statement.
+ * Say for e.g. User creates a UDF class with signature int execute(int var1, String var2). It can be used in a SQL query
+ *     select myudf(id, name) from profile
+ * In the above query, Profile should contain fields named 'id' of INTEGER/NUMBER type and 'name' of type VARCHAR/CHARACTER
+ */
+public interface ScalarUdf {
+  /**
+   * Udfs can implement this method to perform any initialization that they may need.
+   * @param udfConfig Config specific to the udf.
+   */
+  void init(Config udfConfig);
+
+  /**
+   * Actual implementation of the udf function
+   * @param args
+   *   list of all arguments that the udf needs
+   * @return
+   *   Return value from the scalar udf.
+   */
+  Object execute(Object... args);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
new file mode 100644
index 0000000..1c17295
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -0,0 +1,183 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class converts a Samza Avro messages to Relational messages and vice versa.
+ * This supports Samza messages where Key is a string and Value is an avro record.
+ *
+ * Conversion from Samza to Relational Message :
+ *     The key part of the samza message is represented as a special column {@link SamzaSqlRelMessage#KEY_NAME}
+ *     in relational message.
+ *
+ *     The value part of the samza message is expected to be {@link IndexedRecord}, All the fields in the IndexedRecord form
+ *     the corresponding fields of the relational message.
+ *
+ * Conversion from Relational to Samza Message :
+ *     This converts the Samza relational message into Avro {@link GenericRecord}.
+ *     All the fields of the relational message is become fields of the Avro GenericRecord except of the field with name
+ *     {@link SamzaSqlRelMessage#KEY_NAME}. This special field becomes the Key in the output Samza message.
+ */
+public class AvroRelConverter implements SamzaRelConverter {
+
+  protected final Config config;
+  private final Schema avroSchema;
+  private final RelDataType relationalSchema;
+
+  /**
+   * Class that converts the avro field to their corresponding relational fields
+   * Array fields are converted from Avro {@link org.apache.avro.generic.GenericData.Array} to {@link ArrayList}
+   */
+  public enum AvroToRelObjConverter {
+
+    /**
+     * If the relational field type is ArraySqlType, We expect the avro field to be of type either
+     * {@link GenericData.Array} or {@link List} which then is converted to Rel field of type {@link ArrayList}
+     */
+    ArraySqlType {
+      @Override
+      Object convert(Object avroObj) {
+        ArrayList<Object> retVal = new ArrayList<>();
+        if (avroObj != null) {
+          if (avroObj instanceof GenericData.Array) {
+            retVal.addAll(((GenericData.Array) avroObj));
+          } else if (avroObj instanceof List) {
+            retVal.addAll((List) avroObj);
+          }
+        }
+
+        return retVal;
+      }
+    },
+
+    /**
+     * If the relational field type is MapSqlType, We expect the avro field to be of type
+     * {@link Map}
+     */
+    MapSqlType {
+      @Override
+      Object convert(Object obj) {
+        Map<String, Object> retVal = new HashMap<>();
+        if (obj != null) {
+          retVal.putAll((Map<String, ?>) obj);
+        }
+        return retVal;
+      }
+    },
+
+    /**
+     * If the relational field type is RelRecordType, The field is considered an object
+     * and moved to rel field without any translation.
+     */
+    RelRecordType {
+      @Override
+      Object convert(Object obj) {
+        return obj;
+      }
+    },
+
+    /**
+     * If the relational field type is BasicSqlType, The field is moved to rel field without any translation.
+     */
+    BasicSqlType {
+      @Override
+      Object convert(Object obj) {
+        return obj;
+      }
+    };
+
+    abstract Object convert(Object obj);
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroRelConverter.class);
+
+  private final Schema arraySchema = Schema.parse(
+      "{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]},\"java-class\":\"java.util.List\"}");
+  private final Schema mapSchema = Schema.parse(
+      "{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]}}");
+
+
+  public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
+    this.config = config;
+    this.relationalSchema = schemaProvider.getRelationalSchema();
+    this.avroSchema = Schema.parse(schemaProvider.getSchema(systemStream));
+  }
+
+  @Override
+  public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> samzaMessage) {
+    List<Object> values = new ArrayList<>();
+    List<String> fieldNames = new ArrayList<>();
+    Object value = samzaMessage.getValue();
+    if (value instanceof IndexedRecord) {
+      IndexedRecord record = (IndexedRecord) value;
+      fieldNames.addAll(relationalSchema.getFieldNames());
+      values.addAll(relationalSchema.getFieldList()
+          .stream()
+          .map(x -> getRelField(x.getType(), record.get(this.avroSchema.getField(x.getName()).pos())))
+          .collect(Collectors.toList()));
+    } else if (value == null) {
+      fieldNames.addAll(relationalSchema.getFieldNames());
+      IntStream.range(0, fieldNames.size() - 1).forEach(x -> values.add(null));
+    } else {
+      String msg = "Avro message converter doesn't support messages of type " + value.getClass();
+      LOG.error(msg);
+      throw new SamzaException(msg);
+    }
+
+    return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, values);
+  }
+
+  @Override
+  public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
+    GenericRecord record = new GenericData.Record(this.avroSchema);
+    List<String> fieldNames = relMessage.getFieldNames();
+    List<Object> values = relMessage.getFieldValues();
+    for (int index = 0; index < fieldNames.size(); index++) {
+      record.put(fieldNames.get(index), values.get(index));
+    }
+
+    return new KV<>(relMessage.getKey(), record);
+  }
+
+  private Object getRelField(RelDataType relType, Object avroObj) {
+    return AvroToRelObjConverter.valueOf(relType.getClass().getSimpleName()).convert(avroObj);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java
new file mode 100644
index 0000000..278735f
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import java.util.HashMap;
+import java.util.Properties;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Avro Schema Resolver that uses static config to return a schema for a SystemStream.
+ * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ */
+public class AvroRelConverterFactory implements SamzaRelConverterFactory {
+
+  private final HashMap<SystemStream, SamzaRelConverter> relConverters = new HashMap<>();
+
+  @Override
+  public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider schemaProvider, Config config) {
+    return relConverters.computeIfAbsent(systemStream,
+        ss -> new AvroRelConverter(ss, (AvroRelSchemaProvider) schemaProvider, config));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
new file mode 100644
index 0000000..fb11624
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
@@ -0,0 +1,28 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.system.SystemStream;
+
+
+public interface AvroRelSchemaProvider extends RelSchemaProvider {
+  String getSchema(SystemStream systemStream);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
new file mode 100644
index 0000000..74e15e9
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -0,0 +1,132 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.ArraySqlType;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory that creates the Calcite relational types from the Avro Schema. This is used by the
+ * AvroRelConverter to convert the Avro schema to calcite relational schema.
+ */
+public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroTypeFactoryImpl.class);
+
+  public AvroTypeFactoryImpl() {
+    super(RelDataTypeSystem.DEFAULT);
+  }
+
+  public RelDataType createType(Schema schema) {
+    Schema.Type type = schema.getType();
+    if (type != Schema.Type.RECORD) {
+      String msg =
+          String.format("System supports only RECORD as top level avro type, But the Schema's type is %s", type);
+      LOG.error(msg);
+      throw new SamzaException(msg);
+    }
+
+    List<RelDataTypeField> relFields = getRelFields(schema.getFields());
+    return new RelRecordType(relFields);
+  }
+
+  private List<RelDataTypeField> getRelFields(List<Schema.Field> fields) {
+    List<RelDataTypeField> relFields = new ArrayList<>();
+
+    for (Schema.Field field : fields) {
+      String fieldName = field.name();
+      int fieldPos = field.pos() + 1;
+      RelDataType dataType = getRelDataType(field.schema());
+      relFields.add(new RelDataTypeFieldImpl(fieldName, fieldPos, dataType));
+    }
+
+    return relFields;
+  }
+
+  private RelDataType getRelDataType(Schema fieldSchema) {
+    switch (fieldSchema.getType()) {
+      case ARRAY:
+        // TODO JavaTypeFactoryImpl should convert Array into Array(ANY, ANY)
+        // return new ArraySqlType(createSqlType(SqlTypeName.ANY), true);
+        return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
+      case BOOLEAN:
+        return createTypeWithNullability(createSqlType(SqlTypeName.BOOLEAN), true);
+      case DOUBLE:
+        return createTypeWithNullability(createSqlType(SqlTypeName.DOUBLE), true);
+      case FLOAT:
+        return createTypeWithNullability(createSqlType(SqlTypeName.FLOAT), true);
+      case ENUM:
+        return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+      case UNION:
+        return getRelTypeFromUnionTypes(fieldSchema.getTypes());
+      case FIXED:
+        return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY), true);
+      case STRING:
+        return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+      case BYTES:
+        return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY), true);
+      case INT:
+        return createTypeWithNullability(createSqlType(SqlTypeName.INTEGER), true);
+      case LONG:
+        return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), true);
+      case RECORD:
+//        List<RelDataTypeField> relFields = getRelFields(fieldSchema);
+//        return new RelRecordType(relFields);
+        // TODO Calcite execution engine doesn't support record type yet.
+        return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
+      case MAP:
+        // JavaTypeFactoryImpl converts map into Map(ANY, ANY)
+        return super.createMapType(createTypeWithNullability(createSqlType(SqlTypeName.ANY), true),
+            createTypeWithNullability(createSqlType(SqlTypeName.ANY), true));
+      default:
+        String msg = String.format("Field Type %s is not supported", fieldSchema.getType());
+        LOG.error(msg);
+        throw new SamzaException(msg);
+    }
+  }
+
+  private RelDataType getRelTypeFromUnionTypes(List<Schema> types) {
+    // Typically a nullable field's schema is configured as an union of Null and a Type.
+    // This is to check whether the Union is a Nullable field
+    if (types.size() == 2) {
+      if (types.get(0).getType() == Schema.Type.NULL) {
+        return getRelDataType(types.get(1));
+      } else if ((types.get(1).getType() == Schema.Type.NULL)) {
+        return getRelDataType(types.get(0));
+      }
+    }
+
+    return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java
new file mode 100644
index 0000000..4397a75
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Avro Schema Resolver that uses static config to return a schema for a SystemStream.
+ * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ */
+public class ConfigBasedAvroRelSchemaProviderFactory implements RelSchemaProviderFactory {
+
+  public static final String CFG_SOURCE_SCHEMA = "%s.%s.schema";
+
+  public RelSchemaProvider create(SystemStream systemStream, Config config) {
+    return new ConfigBasedAvroRelSchemaProvider(systemStream, config);
+  }
+
+  public static class ConfigBasedAvroRelSchemaProvider implements AvroRelSchemaProvider {
+    private final Config config;
+    private final SystemStream systemStream;
+
+    public ConfigBasedAvroRelSchemaProvider(SystemStream systemStream, Config config) {
+      this.systemStream = systemStream;
+      this.config = config;
+    }
+
+    public RelDataType getRelationalSchema() {
+      String schemaStr = getSchema(systemStream);
+      Schema schema = Schema.parse(schemaStr);
+      AvroTypeFactoryImpl avroTypeFactory = new AvroTypeFactoryImpl();
+      return avroTypeFactory.createType(schema);
+    }
+
+    @Override
+    public String getSchema(SystemStream systemStream) {
+      return config.get(String.format(CFG_SOURCE_SCHEMA, systemStream.getSystem(), systemStream.getStream()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java b/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java
new file mode 100644
index 0000000..9386e31
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java
@@ -0,0 +1,38 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import org.apache.calcite.DataContext;
+
+
+/**
+ * {@link RexToJavaCompiler} creates the Java code for each relational expression at runtime.
+ * This is the interface which the runtime generated java code for each Relational expression should implement.
+ */
+public interface Expression {
+  /**
+   * This method is used to implement the expressions that takes in columns as input and returns multiple values.
+   * @param context the context
+   * @param root the root
+   * @param inputValues All the relational columns for the particular row
+   * @param results the results Result values after executing the java code corresponding to the relational expression.
+   */
+  void execute(SamzaSqlExecutionContext context, DataContext root, Object[] inputValues, Object[] results);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
new file mode 100644
index 0000000..21c81a8
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
@@ -0,0 +1,224 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.List;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.BlockStatement;
+import org.apache.calcite.linq4j.tree.ClassDeclaration;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MemberDeclaration;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.Pair;
+import org.apache.samza.SamzaException;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.commons.compiler.CompilerFactoryFactory;
+import org.codehaus.commons.compiler.IClassBodyEvaluator;
+import org.codehaus.commons.compiler.ICompilerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Defines a SQL row expression to a java class ({@link org.apache.samza.sql.data.Expression}) compiler.
+ *
+ * <p>This is based on Calcite's {@link org.apache.calcite.interpreter.JaninoRexCompiler}. This first generates
+ * a Java AST and them compile it to a class using Janino.</p>
+ */
+public class RexToJavaCompiler {
+  private static final Logger log = LoggerFactory.getLogger(RexToJavaCompiler.class);
+
+  private final RexBuilder rexBuilder;
+
+  public RexToJavaCompiler(RexBuilder rexBuilder) {
+    this.rexBuilder = rexBuilder;
+  }
+
+  /**
+   * Compiles a relational expression to a instance of {@link Expression}
+   *
+   * for e.g.
+   *    Query : select id from profile
+   *      where profile table has relational schema with id(NUMBER) and name(VARCHAR) columns.
+   *    This query will result in the following relational plan
+   *      LogicalProject(id=[$1])
+   *        LogicalTableScan(table=[[profile]])
+   *
+   *    And the corresponding expressions are
+   *       inputs : EnumerableTableScan (Which is the output of LogicalTableScan)
+   *       nodes : [$1] Which essentially means take pick the first column from the input
+   *
+   *
+   *    This function converts the LogicalProject expression "[$1]" with input RexNode which is an output of TableScan
+   *    to a java code that implements the interface {@link Expression}
+   *
+   * @param inputs Input relations/time-varying relations for this row expression
+   * @param nodes relational expressions that needs to be converted to java code.
+   * @return compiled expression of type {@link org.apache.samza.sql.data.Expression}
+   */
+  public org.apache.samza.sql.data.Expression compile(List<RelNode> inputs, List<RexNode> nodes) {
+    /*
+     *  In case there are multiple input relations, we build a single input row type combining types of all the inputs.
+     */
+    final RelDataTypeFactory.FieldInfoBuilder fieldBuilder = rexBuilder.getTypeFactory().builder();
+    for (RelNode input : inputs) {
+      fieldBuilder.addAll(input.getRowType().getFieldList());
+    }
+    final RelDataType inputRowType = fieldBuilder.build();
+    final RexProgramBuilder programBuilder = new RexProgramBuilder(inputRowType, rexBuilder);
+    for (RexNode node : nodes) {
+      programBuilder.addProject(node, null);
+    }
+    final RexProgram program = programBuilder.getProgram();
+
+    final BlockBuilder builder = new BlockBuilder();
+    final ParameterExpression executionContext = Expressions.parameter(SamzaSqlExecutionContext.class, "context");
+    final ParameterExpression root = DataContext.ROOT;
+    final ParameterExpression inputValues = Expressions.parameter(Object[].class, "inputValues");
+    final ParameterExpression outputValues = Expressions.parameter(Object[].class, "outputValues");
+    final JavaTypeFactoryImpl javaTypeFactory = new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+
+    // public void execute(Object[] inputValues, Object[] outputValues)
+    final RexToLixTranslator.InputGetter inputGetter = new RexToLixTranslator.InputGetterImpl(ImmutableList.of(
+        Pair.<org.apache.calcite.linq4j.tree.Expression, PhysType>of(
+            Expressions.variable(Object[].class, "inputValues"),
+            PhysTypeImpl.of(javaTypeFactory, inputRowType, JavaRowFormat.ARRAY, false))));
+
+    final List<org.apache.calcite.linq4j.tree.Expression> list =
+        RexToLixTranslator.translateProjects(program, javaTypeFactory, builder, null, DataContext.ROOT, inputGetter,
+            null);
+    for (int i = 0; i < list.size(); i++) {
+      builder.add(Expressions.statement(
+          Expressions.assign(Expressions.arrayIndex(outputValues, Expressions.constant(i)), list.get(i))));
+    }
+    return createSamzaExpressionFromCalcite(executionContext, root, inputValues, outputValues, builder.toBlock());
+  }
+
+  /**
+   * This method takes the java statement block, inputs, outputs needed by the statement block to create an object
+   * of class that implements the interface {@link Expression}
+   *
+   * for e.g.
+   *   Query : select id from profile
+   *      where profile table has relational schema with id(NUMBER) and name(VARCHAR) columns.
+   *    This query will result in the following relational plan
+   *      LogicalProject(id=[$1])
+   *        LogicalTableScan(table=[[profile]])
+   *
+   *
+   *    And the corresponding expressions are
+   *       inputs : EnumerableTableScan (Which is the output of LogicalTableScan)
+   *       nodes : [$1] Which essentially means take pick the first column from the input
+   *
+   *    This expression corresponding to the logicalProject "[$1]" gets converted into a java statement block
+   *    {
+   *      outputValues[0] = (Integer) inputValues[1];
+   *    }
+   *
+   *    This method converts this statement block into an equivalent {@link Expression} object whose execute methods
+   *    execute the above java statement block
+   *
+   */
+  static org.apache.samza.sql.data.Expression createSamzaExpressionFromCalcite(ParameterExpression executionContext,
+      ParameterExpression dataContext, ParameterExpression inputValues, ParameterExpression outputValues,
+      BlockStatement block) {
+    final List<MemberDeclaration> declarations = Lists.newArrayList();
+
+    // public void execute(Object[] inputValues, Object[] outputValues)
+    declarations.add(
+        Expressions.methodDecl(Modifier.PUBLIC, void.class, SamzaBuiltInMethod.EXPR_EXECUTE2.method.getName(),
+            ImmutableList.of(executionContext, dataContext, inputValues, outputValues), block));
+
+    final ClassDeclaration classDeclaration = Expressions.classDecl(Modifier.PUBLIC, "SqlExpression", null,
+        ImmutableList.<Type>of(org.apache.samza.sql.data.Expression.class), declarations);
+    String s = Expressions.toString(declarations, "\n", false);
+
+    log.info("Generated code for expression: {}", s);
+
+    try {
+      return getExpression(classDeclaration, s);
+    } catch (Exception e) {
+      throw new SamzaException("Expression compilation failure.", e);
+    }
+  }
+
+  /**
+   * Creates the instance of the class defined in {@link ClassDeclaration}
+   * @param expr Interface whose instance needs to be created.
+   * @param s The java code that implements the interface which should be used to create the instance.
+   * @return The object of the class which implements the interface {@link Expression} with the code that is passed as input.
+   * @throws CompileException
+   * @throws IOException
+   */
+  static Expression getExpression(ClassDeclaration expr, String s) throws CompileException, IOException {
+    ICompilerFactory compilerFactory;
+    try {
+      compilerFactory = CompilerFactoryFactory.getDefaultCompilerFactory();
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to instantiate java compiler", e);
+    }
+    IClassBodyEvaluator cbe = compilerFactory.newClassBodyEvaluator();
+    cbe.setClassName(expr.name);
+    cbe.setImplementedInterfaces(expr.implemented.toArray(new Class[expr.implemented.size()]));
+    cbe.setParentClassLoader(RexToJavaCompiler.class.getClassLoader());
+    cbe.setDebuggingInformation(true, true, true);
+
+    return (org.apache.samza.sql.data.Expression) cbe.createInstance(new StringReader(s));
+  }
+
+  /**
+   * Represents the methods in the class {@link Expression}
+   */
+  public enum SamzaBuiltInMethod {
+    EXPR_EXECUTE2(org.apache.samza.sql.data.Expression.class, "execute", SamzaSqlExecutionContext.class,
+        DataContext.class, Object[].class, Object[].class);
+
+    public final Method method;
+
+    /**
+     * Defines a method.
+     */
+    SamzaBuiltInMethod(Class clazz, String methodName, Class... argumentTypes) {
+      this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
new file mode 100644
index 0000000..88bcb61
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.UdfMetadata;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+public class SamzaSqlExecutionContext {
+
+  private final SamzaSqlApplicationConfig sqlConfig;
+  private final Map<String, UdfMetadata> udfMetadata;
+  private final Map<String, ScalarUdf> udfInstances = new HashMap<>();
+
+  public SamzaSqlExecutionContext(SamzaSqlApplicationConfig config) {
+    this.sqlConfig = config;
+    udfMetadata =
+        this.sqlConfig.getUdfMetadata().stream().collect(Collectors.toMap(UdfMetadata::getName, Function.identity()));
+  }
+
+  public ScalarUdf getOrCreateUdf(String clazz, String udfName) {
+    return udfInstances.computeIfAbsent(udfName, s -> createInstance(clazz, udfName));
+  }
+
+  public ScalarUdf createInstance(String clazz, String udfName) {
+    Config udfConfig = udfMetadata.get(udfName).getUdfConfig();
+    ScalarUdf scalarUdf = ReflectionUtils.createInstance(clazz);
+    if (scalarUdf == null) {
+      String msg = String.format("Couldn't create udf %s of class %s", udfName, clazz);
+      throw new SamzaException(msg);
+    }
+    scalarUdf.init(udfConfig);
+    return scalarUdf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
new file mode 100644
index 0000000..b5df545
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -0,0 +1,123 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+
+/**
+ * Samza sql relational message. Each Samza sql relational message represents a relational row in a table.
+ * Each row of the relational table and hence SamzaSqlRelMessage consists of list of column values and
+ * their associated column names. Right now we donot store any other metadata other than the column name in the
+ * SamzaSqlRelationalMessage, In future if we find a need, we could add additional column ddl metadata around
+ * primary Key, nullability, etc.
+ */
+public class SamzaSqlRelMessage {
+
+  public static final String KEY_NAME = "__key__";
+
+  private final List<Object> value = new ArrayList<>();
+  private final List<Object> relFieldValues = new ArrayList<>();
+  private final List<String> names = new ArrayList<>();
+  private final Object key;
+
+  /**
+   * Create the SamzaSqlRelMessage, Each rel message represents a row in the table.
+   * So it can contain a key and a list of fields in the row.
+   * @param key Represents the key in the row, Key is optional, in which case it can be null.
+   * @param names Ordered list of field names in the row.
+   * @param values Ordered list of all the values in the row. Since the samzaSqlRelMessage can represent
+   *               the row in a change capture event stream, It can contain delete messages in which case
+   *               all the fields in the row can be null.
+   */
+  public SamzaSqlRelMessage(Object key, List<String> names, List<Object> values) {
+    this.key = key;
+    this.value.addAll(values);
+    this.names.addAll(names);
+    if (key != null) {
+      this.relFieldValues.add(key);
+    }
+    this.relFieldValues.addAll(values);
+  }
+
+  /**
+   * Get the field names of all the columns in the relational message.
+   * @return the field names of all columns.
+   */
+  public List<String> getFieldNames() {
+    return names;
+  }
+
+  /**
+   * Get the values of all the columns in the relational message.
+   * @return the values of all the columns
+   */
+  public List<Object> getFieldValues() {
+    return value;
+  }
+
+  public List<Object> getRelFieldValues() {
+    return this.relFieldValues;
+  }
+
+  public Object getKey() {
+    return key;
+  }
+
+  /**
+   * Get the value of the field corresponding to the field name.
+   * @param name Name of the field.
+   * @return returns the value of the field.
+   */
+  public Optional<Object> getField(String name) {
+    for (int index = 0; index < names.size(); index++) {
+      if (names.get(index).equals(name)) {
+        return Optional.ofNullable(value.get(index));
+      }
+    }
+
+    return Optional.empty();
+  }
+
+  /**
+   * Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values.
+   * If the field list contains KEY, then it extracts the key out of the fields to create the
+   * RelMessage with key and values.
+   * @param fieldValues Field values that can include the key as well.
+   * @param fieldNames Field names in the rel message that can include the special __key__
+   * @return Created SamzaSqlRelMessage.
+   */
+  public static SamzaSqlRelMessage createRelMessage(List<Object> fieldValues, List<String> fieldNames) {
+    int keyIndex = fieldNames.indexOf(KEY_NAME);
+    fieldNames = new ArrayList<>(fieldNames);
+    fieldValues = new ArrayList<>(fieldValues);
+    Object key = null;
+    if (keyIndex != -1) {
+      key = fieldValues.get(keyIndex);
+      fieldValues.remove(keyIndex);
+      fieldNames.remove(keyIndex);
+    }
+
+    return new SamzaSqlRelMessage(key, fieldNames, fieldValues);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
new file mode 100644
index 0000000..39a26ce
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import java.util.List;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+public class FlattenUdf implements ScalarUdf {
+  @Override
+  public void init(Config udfConfig) {
+  }
+
+  public Object execute(Object... arg) {
+    List value = (List) arg[0];
+    return value != null && !value.isEmpty() ? value.get(0) : value;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
new file mode 100644
index 0000000..1203b25
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
@@ -0,0 +1,71 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.impl;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.sql.interfaces.SourceResolverFactory;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Schema Resolver that uses static config to return a config corresponding to a system stream.
+ * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ */
+public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConfigBasedSourceResolverFactory.class);
+
+  public static final String CFG_FMT_SAMZA_PREFIX = "systems.%s.";
+
+  @Override
+  public SourceResolver create(Config config) {
+    return new ConfigBasedSourceResolver(config);
+  }
+
+  private class ConfigBasedSourceResolver implements SourceResolver {
+    private final Config config;
+
+    public ConfigBasedSourceResolver(Config config) {
+      this.config = config;
+    }
+
+    @Override
+    public SqlSystemStreamConfig fetchSourceInfo(String source) {
+      String[] sourceComponents = source.split("\\.");
+      if (sourceComponents.length != 2) {
+        String msg = String.format("Source %s is not of the format <system>.<stream>", source);
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+      String systemName = sourceComponents[0];
+      String streamName = sourceComponents[1];
+
+      return new SqlSystemStreamConfig(systemName, streamName, fetchSystemConfigs(systemName));
+    }
+
+    private Config fetchSystemConfigs(String systemName) {
+      return config.subset(systemName + ".");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
new file mode 100644
index 0000000..c98a4a1
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.impl;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.UdfMetadata;
+import org.apache.samza.sql.interfaces.UdfResolver;
+import org.apache.samza.sql.udfs.ScalarUdf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Udf resolver that uses static config to return the UDFs present in the Samza SQL application
+ * All the UDF classes are provided to this factory as a comma separated list of values for the config named
+ * "udfClasses".
+ * This factory loads all the udf classes that are configured, performs the validation to ensure that they extend
+ * {@link ScalarUdf} and implement the method named "execute"
+ */
+public class ConfigBasedUdfResolver implements UdfResolver {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConfigBasedUdfResolver.class);
+  public static final String CFG_UDF_CLASSES = "udfClasses";
+  public static final String UDF_METHOD_NAME = "execute";
+
+  private final ArrayList<UdfMetadata> udfs;
+
+  public ConfigBasedUdfResolver(Properties config, Config udfConfig) {
+    List<String> udfClasses = Arrays.stream(config.getProperty(CFG_UDF_CLASSES, "").split(","))
+        .filter(StringUtils::isNotBlank)
+        .collect(Collectors.toList());
+    udfs = new ArrayList<>();
+    Class<?> udfClass;
+    for (String udfClassName : udfClasses) {
+      try {
+        udfClass = Class.forName(udfClassName);
+      } catch (ClassNotFoundException e) {
+        String msg = String.format("Couldn't load the udf class %s", udfClassName);
+        LOG.error(msg, e);
+        throw new SamzaException(msg, e);
+      }
+
+      if (!ScalarUdf.class.isAssignableFrom(udfClass)) {
+        String msg = String.format("Udf class %s is not extended from %s", udfClassName, ScalarUdf.class.getName());
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+
+      Optional<Method> udfMethod =
+          Arrays.stream(udfClass.getMethods()).filter(x -> x.getName().equals(UDF_METHOD_NAME)).findFirst();
+
+      if (!udfMethod.isPresent()) {
+        String msg = String.format("Udf Class %s doesn't implement method named %s", udfClassName, UDF_METHOD_NAME);
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+
+      int udfIndex = udfClass.getSimpleName().toLowerCase().lastIndexOf("udf");
+      String udfName = udfClass.getSimpleName().substring(0, udfIndex);
+
+      udfs.add(new UdfMetadata(udfName, udfMethod.get(), udfConfig.subset(udfName + ".")));
+    }
+  }
+
+  @Override
+  public Collection<UdfMetadata> getUdfs() {
+    return udfs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java
new file mode 100644
index 0000000..ee95224
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.calcite.rel.type.RelDataType;
+
+
+/**
+ * Samza SQL application uses {@link RelSchemaProvider} to get the relational schema corresponding to the system
+ * and stream.
+ */
+public interface RelSchemaProvider {
+  /**
+   * Converts the schema to relational schema. It is possible that there can be multiple tables associated
+   * within a single schema.
+   * @return Relational schema corresponding to the system stream.
+   */
+  RelDataType getRelationalSchema();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
new file mode 100644
index 0000000..c614cdf
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+public interface RelSchemaProviderFactory {
+
+  /**
+   * Create a {@link RelSchemaProvider} given the config
+   * @param config Config needed to create the {@link RelSchemaProvider}
+   * @return {@link RelSchemaProvider} object created.
+   */
+  RelSchemaProvider create(SystemStream systemStream, Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
new file mode 100644
index 0000000..12d5f28
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
@@ -0,0 +1,46 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+
+
+/**
+ * Samza SQL application uses {@link SamzaRelConverter} to convert the Samza messages to relational message before
+ * it can be processed by the calcite engine.
+ * The {@link SamzaRelConverter} is configurable at a system level, So it is possible to configure different
+ * {@link SamzaRelConverter} for different systems.
+ */
+public interface SamzaRelConverter {
+  /**
+   * Converts the object to relational message corresponding to the tableName with relational schema.
+   * @param message samza message that needs to be converted.
+   * @return Relational message extracted from the object.
+   */
+  SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> message);
+
+  /**
+   * Convert the relational message to the output message.
+   * @param relMessage relational message that needs to be converted.
+   * @return the key and value of the Samza message
+   */
+  KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
new file mode 100644
index 0000000..f239df6
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Factory that is used to create {@link SamzaRelConverter}
+ */
+public interface SamzaRelConverterFactory {
+
+  /**
+   * Create a {@link SamzaRelConverter}. This method is called when the framework wants to create the
+   * {@link SamzaRelConverter} corresponding to the system.
+   * @param config
+   *  config that is used to create the object
+   * @return Returns the object created.
+   */
+  SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
new file mode 100644
index 0000000..ac3fd31
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
@@ -0,0 +1,34 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+/**
+ * Source Resolvers are used by Samza Sql application to fetch the {@link SqlSystemStreamConfig} corresponding to the source.
+ */
+public interface SourceResolver {
+  /**
+   * Returns the SystemStream config corresponding to the source name
+   * @param sourceName
+   *  source whose systemstreamconfig needs to be fetched.
+   * @return
+   *  System stream config corresponding to the source.
+   */
+  SqlSystemStreamConfig fetchSourceInfo(String sourceName);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
new file mode 100644
index 0000000..274a6b1
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory that is used to create {@link SourceResolver}.
+ */
+public interface SourceResolverFactory {
+
+  /**
+   * Create the {@link SourceResolver}. This is called during the application initialization.
+   * @param config config for the SourceResolver
+   * @return Returns the created {@link SourceResolver}
+   */
+  SourceResolver create(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
new file mode 100644
index 0000000..df21784
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
@@ -0,0 +1,74 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Configs associated with a system stream.
+ */
+public class SqlSystemStreamConfig {
+
+  public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
+  public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
+
+  private final String systemName;
+
+  private final String streamName;
+
+  private final String samzaRelConverterName;
+  private final SystemStream systemStream;
+  private String relSchemaProviderName;
+
+  public SqlSystemStreamConfig(String systemName, String streamName, Config systemConfig) {
+
+    this.systemName = systemName;
+    this.streamName = streamName;
+    this.systemStream = new SystemStream(systemName, streamName);
+
+    samzaRelConverterName = systemConfig.get(CFG_SAMZA_REL_CONVERTER);
+    relSchemaProviderName = systemConfig.get(CFG_REL_SCHEMA_PROVIDER);
+    Validate.notEmpty(samzaRelConverterName,
+        String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+  }
+
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public String getSamzaRelConverterName() {
+    return samzaRelConverterName;
+  }
+
+  public String getRelSchemaProviderName() {
+    return relSchemaProviderName;
+  }
+
+  public SystemStream getSystemStream() {
+    return systemStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
new file mode 100644
index 0000000..b1a2d6d
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import java.lang.reflect.Method;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Metadata corresponding to the Udf
+ */
+public class UdfMetadata {
+
+  private final String name;
+
+  private final Method udfMethod;
+
+  private final Config udfConfig;
+
+  public UdfMetadata(String name, Method udfMethod, Config udfConfig) {
+    this.name = name;
+    this.udfMethod = udfMethod;
+    this.udfConfig = udfConfig;
+  }
+
+  public Config getUdfConfig() {
+    return udfConfig;
+  }
+
+  /**
+   * @return Returns the instance of the {@link Method} corresponding to the UDF.
+   */
+  public Method getUdfMethod() {
+    return udfMethod;
+  }
+
+  /**
+   * @return Returns the name of the Udf.
+   */
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java
new file mode 100644
index 0000000..7e84118
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java
@@ -0,0 +1,35 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import java.util.Collection;
+
+/**
+ * Plugin that is used to discover the UDFs available for the Samza SQL.
+ */
+public interface UdfResolver {
+
+  /**
+   * This method is called by the framework during initialization to fetch the {@link UdfMetadata} for all the UDFs
+   * available in the system.
+   * @return {@link UdfMetadata} for all the Udfs available in the system.
+   */
+  Collection<UdfMetadata> getUdfs();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
new file mode 100644
index 0000000..851c895
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
@@ -0,0 +1,93 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.util.Map;
+
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.collect.Maps;
+
+
+class Checker implements SqlOperandTypeChecker {
+  private SqlOperandCountRange range;
+
+  public static final Checker ANY_CHECKER = new Checker();
+  private static final Map<Pair<Integer, Integer>, Checker> checkerMap = Maps.newHashMap();
+
+  public static Checker getChecker(int min, int max) {
+    final Pair<Integer, Integer> range = Pair.of(min, max);
+    if (checkerMap.containsKey(range)) {
+      return checkerMap.get(range);
+    }
+
+    final Checker newChecker;
+    if (min == max) {
+      newChecker = new Checker(min);
+    } else {
+      newChecker = new Checker(min, max);
+    }
+
+    checkerMap.put(range, newChecker);
+    return newChecker;
+  }
+
+  private Checker(int size) {
+    range = SqlOperandCountRanges.of(size);
+  }
+
+  private Checker(int min, int max) {
+    range = SqlOperandCountRanges.between(min, max);
+  }
+
+  private Checker() {
+    range = SqlOperandCountRanges.any();
+  }
+
+  @Override
+  public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+    return true;
+  }
+
+  @Override
+  public SqlOperandCountRange getOperandCountRange() {
+    return range;
+  }
+
+  @Override
+  public String getAllowedSignatures(SqlOperator op, String opName) {
+    return opName + "(Drill - Opaque)";
+  }
+
+  @Override
+  public Consistency getConsistency() {
+    return Consistency.NONE;
+  }
+
+  @Override
+  public boolean isOptional(int i) {
+    return false;
+  }
+}
\ No newline at end of file