You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/04/15 22:57:01 UTC

[1/2] samza git commit: SAMZA-649 Move SQL front end files depending on Calcite to samza-sql-calcite

Repository: samza
Updated Branches:
  refs/heads/samza-sql bc5833b0d -> 46a6de437


http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java b/samza-sql-core/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java
deleted file mode 100644
index b4ac5f5..0000000
--- a/samza-sql-core/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.test.metadata;
-
-import org.apache.avro.Schema;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.samza.sql.metadata.AvroSchemaConverter;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestAvroSchemaConverter {
-  public static final String SIMPLE_RECORD_SCHEMA = "{\"namespace\": \"example.avro\",\n" +
-      " \"type\": \"record\",\n" +
-      " \"name\": \"User\",\n" +
-      " \"fields\": [\n" +
-      "     {\"name\": \"name\", \"type\": \"string\"},\n" +
-      "     {\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]},\n" +
-      "     {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
-      " ]\n" +
-      "}";
-
-  public static final Schema simpleRecord = new Schema.Parser().parse(SIMPLE_RECORD_SCHEMA);
-  @Test
-  public void testSimpleAvroRecord(){
-    RelDataTypeFactory relDataTypeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
-    AvroSchemaConverter schemaConverter = new AvroSchemaConverter(relDataTypeFactory, simpleRecord);
-
-    RelDataType relDataType = schemaConverter.convert();
-
-    Assert.assertEquals(SqlTypeName.VARCHAR, relDataType.getField("name", false, false).getType().getSqlTypeName());
-    Assert.assertEquals(SqlTypeName.INTEGER, relDataType.getField("favorite_number", false, false).getType().getSqlTypeName());
-    Assert.assertTrue(relDataType.getField("favorite_number", false, false).getType().isNullable());
-    Assert.assertEquals(SqlTypeName.VARCHAR, relDataType.getField("favorite_color", false, false).getType().getSqlTypeName());
-    Assert.assertTrue(relDataType.getField("favorite_color", false, false).getType().isNullable());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 5cbb755..544bfc3 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -27,7 +27,8 @@ include \
   'samza-shell',
   'samza-yarn',
   'samza-test',
-  'samza-sql-core'
+  'samza-sql-core',
+  'samza-sql-calcite'
 
 rootProject.children.each {
   if (it.name != 'samza-api' && it.name != 'samza-shell' && it.name != 'samza-log4j') {


[2/2] samza git commit: SAMZA-649 Move SQL front end files depending on Calcite to samza-sql-calcite

Posted by ni...@apache.org.
SAMZA-649 Move SQL front end files depending on Calcite to samza-sql-calcite


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/46a6de43
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/46a6de43
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/46a6de43

Branch: refs/heads/samza-sql
Commit: 46a6de437202b199273c0602ac15db515a6040b9
Parents: bc5833b
Author: Milinda L. Pathirage <mi...@gmail.com>
Authored: Wed Apr 15 13:36:50 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@yipan-ld1.linkedin.biz>
Committed: Wed Apr 15 13:36:50 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |  11 +
 .../samza/sql/calcite/planner/QueryPlanner.java | 212 +++++++++++
 .../calcite/planner/SamzaCalciteConnection.java | 373 +++++++++++++++++++
 .../planner/SamzaQueryPreparingStatement.java   | 111 ++++++
 .../sql/calcite/planner/SamzaSqlValidator.java  |  57 +++
 .../sql/calcite/schema/AvroSchemaConverter.java | 132 +++++++
 .../planner/SamzaStreamTableFactory.java        |  94 +++++
 .../sql/calcite/planner/TestQueryPlanner.java   |  94 +++++
 .../calcite/schema/TestAvroSchemaConverter.java |  55 +++
 .../samza/sql/metadata/AvroSchemaConverter.java | 132 -------
 .../apache/samza/sql/planner/QueryPlanner.java  | 214 -----------
 .../sql/planner/SamzaCalciteConnection.java     | 373 -------------------
 .../planner/SamzaQueryPreparingStatement.java   | 111 ------
 .../samza/sql/planner/SamzaSqlValidator.java    |  54 ---
 .../samza/sql/planner/QueryPlannerTest.java     |  94 -----
 .../sql/planner/SamzaStreamTableFactory.java    |  94 -----
 .../test/metadata/TestAvroSchemaConverter.java  |  57 ---
 settings.gradle                                 |   3 +-
 18 files changed, 1141 insertions(+), 1130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index a1c7133..a042567 100644
--- a/build.gradle
+++ b/build.gradle
@@ -279,6 +279,17 @@ project(":samza-sql-core_$scalaVersion") {
   }
 }
 
+project(":samza-sql-calcite_$scalaVersion") {
+  apply plugin: 'java'
+
+  dependencies {
+    compile project(":samza-sql-core_$scalaVersion")
+    compile "org.apache.calcite:calcite-core:$calciteVersion"
+    testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
+  }
+}
+
 project(":samza-shell") {
   apply plugin: 'java'
 

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java
new file mode 100644
index 0000000..e1c22e9
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.plan.*;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.rules.*;
+import org.apache.calcite.rel.stream.StreamRules;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.List;
+
+/**
+ * Streaming query planner implementation based on Calcite.
+ */
+public class QueryPlanner {
+  public static final boolean COMMUTE =
+      "true".equals(
+          System.getProperties().getProperty("calcite.enable.join.commute"));
+
+  /**
+   * Whether to enable the collation trait. Some extra optimizations are
+   * possible if enabled, but queries should work either way. At some point
+   * this will become a preference, or we will run multiple phases: first
+   * disabled, then enabled.
+   */
+  private static final boolean ENABLE_COLLATION_TRAIT = true;
+
+  private static final List<RelOptRule> DEFAULT_RULES =
+      ImmutableList.of(
+          AggregateStarTableRule.INSTANCE,
+          AggregateStarTableRule.INSTANCE2,
+          TableScanRule.INSTANCE,
+          COMMUTE
+              ? JoinAssociateRule.INSTANCE
+              : ProjectMergeRule.INSTANCE,
+          FilterTableScanRule.INSTANCE,
+          ProjectFilterTransposeRule.INSTANCE,
+          FilterProjectTransposeRule.INSTANCE,
+          FilterJoinRule.FILTER_ON_JOIN,
+          AggregateExpandDistinctAggregatesRule.INSTANCE,
+          AggregateReduceFunctionsRule.INSTANCE,
+          FilterAggregateTransposeRule.INSTANCE,
+          JoinCommuteRule.INSTANCE,
+          JoinPushThroughJoinRule.RIGHT,
+          JoinPushThroughJoinRule.LEFT,
+          SortProjectTransposeRule.INSTANCE);
+
+  private static final List<RelOptRule> ENUMERABLE_RULES =
+      ImmutableList.of(
+          EnumerableRules.ENUMERABLE_JOIN_RULE,
+          EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE,
+          EnumerableRules.ENUMERABLE_CORRELATE_RULE,
+          EnumerableRules.ENUMERABLE_PROJECT_RULE,
+          EnumerableRules.ENUMERABLE_FILTER_RULE,
+          EnumerableRules.ENUMERABLE_AGGREGATE_RULE,
+          EnumerableRules.ENUMERABLE_SORT_RULE,
+          EnumerableRules.ENUMERABLE_LIMIT_RULE,
+          EnumerableRules.ENUMERABLE_COLLECT_RULE,
+          EnumerableRules.ENUMERABLE_UNCOLLECT_RULE,
+          EnumerableRules.ENUMERABLE_UNION_RULE,
+          EnumerableRules.ENUMERABLE_INTERSECT_RULE,
+          EnumerableRules.ENUMERABLE_MINUS_RULE,
+          EnumerableRules.ENUMERABLE_TABLE_MODIFICATION_RULE,
+          EnumerableRules.ENUMERABLE_VALUES_RULE,
+          EnumerableRules.ENUMERABLE_WINDOW_RULE,
+          EnumerableRules.ENUMERABLE_TABLE_FUNCTION_SCAN_RULE);
+
+  private static final List<RelOptRule> CONSTANT_REDUCTION_RULES =
+      ImmutableList.of(
+          ReduceExpressionsRule.PROJECT_INSTANCE,
+          ReduceExpressionsRule.FILTER_INSTANCE,
+          ReduceExpressionsRule.CALC_INSTANCE,
+          ReduceExpressionsRule.JOIN_INSTANCE,
+          ValuesReduceRule.FILTER_INSTANCE,
+          ValuesReduceRule.PROJECT_FILTER_INSTANCE,
+          ValuesReduceRule.PROJECT_INSTANCE);
+
+  /**
+   * Transform streaming query to a query plan.
+   * @param query streaming query in SQL with streaming extensions
+   * @param context query prepare context
+   * @return query plan
+   */
+  public RelNode getPlan(String query, CalcitePrepare.Context context) {
+    final JavaTypeFactory typeFactory = context.getTypeFactory();
+    final CalciteConnectionConfig config = context.config();
+
+    CalciteCatalogReader catalogReader = new CalciteCatalogReader(context.getRootSchema(),
+        false,
+        context.getDefaultSchemaPath(),
+        typeFactory);
+
+    SqlParser sqlParser = SqlParser.create(query,
+        SqlParser.configBuilder()
+            .setQuotedCasing(config.quotedCasing())
+            .setUnquotedCasing(config.unquotedCasing())
+            .setQuoting(config.quoting())
+            .build());
+
+    SqlNode sqlNode;
+
+    try {
+      sqlNode = sqlParser.parseStmt();
+    } catch (SqlParseException e) {
+      throw new RuntimeException("parse failed: " + e.getMessage(), e);
+    }
+
+    final ChainedSqlOperatorTable operatorTable =
+        new ChainedSqlOperatorTable(
+            ImmutableList.of(SqlStdOperatorTable.instance(), catalogReader));
+
+    final SqlValidator validator =
+        new SamzaSqlValidator(operatorTable, catalogReader, typeFactory);
+    validator.setIdentifierExpansion(true);
+
+    SqlNode validatedSqlNode = validator.validate(sqlNode);
+
+    final RelOptPlanner planner = createStreamingRelOptPlanner(context, null, null);
+
+    final SamzaQueryPreparingStatement preparingStmt =
+        new SamzaQueryPreparingStatement(
+            context,
+            catalogReader,
+            typeFactory,
+            context.getRootSchema(),
+            EnumerableRel.Prefer.ARRAY,
+            planner,
+            EnumerableConvention.INSTANCE);
+
+    /* TODO: Add query optimization. */
+
+    return preparingStmt.getSqlToRelConverter(validator, catalogReader).convertQuery(validatedSqlNode, false, true);
+  }
+
+  /**
+   * Creates a query planner and initializes it with a default set of
+   * rules.
+   *
+   * @param prepareContext  context for preparing a statement
+   * @param externalContext external query planning context
+   * @param costFactory     cost factory for cost based query planning
+   * @return relation query planner instance
+   */
+  protected RelOptPlanner createStreamingRelOptPlanner(final CalcitePrepare.Context prepareContext,
+                                                       org.apache.calcite.plan.Context externalContext,
+                                                       RelOptCostFactory costFactory) {
+    if (externalContext == null) {
+      externalContext = Contexts.withConfig(prepareContext.config());
+    }
+
+    final VolcanoPlanner planner =
+        new VolcanoPlanner(costFactory, externalContext);
+
+    planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
+
+    if (ENABLE_COLLATION_TRAIT) {
+      planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
+      planner.registerAbstractRelationalRules();
+    }
+    RelOptUtil.registerAbstractRels(planner);
+    for (RelOptRule rule : DEFAULT_RULES) {
+      planner.addRule(rule);
+    }
+
+    /* Note: Bindable rules were removed until Calcite switches the convention of the root node to bindable. */
+
+    for (RelOptRule rule : ENUMERABLE_RULES) {
+      planner.addRule(rule);
+    }
+
+    for (RelOptRule rule : StreamRules.RULES) {
+      planner.addRule(rule);
+    }
+
+    /* Note: Constant reduction rules were removed because current Calcite implementation doesn't use them. */
+
+    return planner;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaCalciteConnection.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaCalciteConnection.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaCalciteConnection.java
new file mode 100644
index 0000000..64c8f83
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaCalciteConnection.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteRootSchema;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.model.ModelHandler;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.sql.*;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * Minimal <code>org.apache.calcite.jdbc.CalciteConnection</code> implementation which enables
+ * re-use of Calcite code.
+ */
+public class SamzaCalciteConnection implements CalciteConnection {
+  private static final String INLINE = "inline:";
+  private final JavaTypeFactory typeFactory;
+  private final CalciteRootSchema rootSchema;
+  private String schema;
+
+  public SamzaCalciteConnection(String model) throws IOException {
+    typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    rootSchema = CalciteSchema.createRootSchema(true);
+    new ModelHandler(this, INLINE + model);
+  }
+
+  public CalciteRootSchema getCalciteRootSchema(){
+    return rootSchema;
+  }
+
+  @Override
+  public SchemaPlus getRootSchema() {
+    return rootSchema.plus();
+  }
+
+  @Override
+  public JavaTypeFactory getTypeFactory() {
+    return typeFactory;
+  }
+
+  @Override
+  public Properties getProperties() {
+    return null;
+  }
+
+  @Override
+  public Statement createStatement() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public String nativeSQL(String sql) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setAutoCommit(boolean autoCommit) throws SQLException {
+
+  }
+
+  @Override
+  public boolean getAutoCommit() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void commit() throws SQLException {
+
+  }
+
+  @Override
+  public void rollback() throws SQLException {
+
+  }
+
+  @Override
+  public void close() throws SQLException {
+
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public DatabaseMetaData getMetaData() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setReadOnly(boolean readOnly) throws SQLException {
+
+  }
+
+  @Override
+  public boolean isReadOnly() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void setCatalog(String catalog) throws SQLException {
+
+  }
+
+  @Override
+  public String getCatalog() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setTransactionIsolation(int level) throws SQLException {
+
+  }
+
+  @Override
+  public int getTransactionIsolation() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+
+  }
+
+  @Override
+  public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Map<String, Class<?>> getTypeMap() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+
+  }
+
+  @Override
+  public void setHoldability(int holdability) throws SQLException {
+
+  }
+
+  @Override
+  public int getHoldability() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public Savepoint setSavepoint() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Savepoint setSavepoint(String name) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void rollback(Savepoint savepoint) throws SQLException {
+
+  }
+
+  @Override
+  public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+
+  }
+
+  @Override
+  public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Clob createClob() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Blob createBlob() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public NClob createNClob() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public SQLXML createSQLXML() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public boolean isValid(int timeout) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void setClientInfo(String name, String value) throws SQLClientInfoException {
+
+  }
+
+  @Override
+  public void setClientInfo(Properties properties) throws SQLClientInfoException {
+
+  }
+
+  @Override
+  public String getClientInfo(String name) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Properties getClientInfo() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setSchema(String schema) throws SQLException {
+    this.schema = schema;
+  }
+
+  @Override
+  public String getSchema() throws SQLException {
+    return schema;
+  }
+
+  public void abort(Executor executor) throws SQLException {
+
+  }
+
+  public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+
+  }
+
+  public int getNetworkTimeout() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public CalciteConnectionConfig config() {
+    return new CalciteConnectionConfigImpl(new Properties());
+  }
+
+  @Override
+  public <T> Queryable<T> createQuery(Expression expression, Class<T> rowType) {
+    return null;
+  }
+
+  @Override
+  public <T> Queryable<T> createQuery(Expression expression, Type rowType) {
+    return null;
+  }
+
+  @Override
+  public <T> T execute(Expression expression, Class<T> type) {
+    return null;
+  }
+
+  @Override
+  public <T> T execute(Expression expression, Type type) {
+    return null;
+  }
+
+  @Override
+  public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
+    return null;
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaQueryPreparingStatement.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaQueryPreparingStatement.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaQueryPreparingStatement.java
new file mode 100644
index 0000000..89c4836
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaQueryPreparingStatement.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import com.google.common.collect.Maps;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.sql2rel.StandardConvertletTable;
+
+import java.util.List;
+import java.util.Map;
+
+public class SamzaQueryPreparingStatement extends Prepare implements RelOptTable.ViewExpander {
+  private final RelOptPlanner planner;
+  private final RexBuilder rexBuilder;
+  protected final CalciteSchema schema;
+  protected final RelDataTypeFactory typeFactory;
+  private final EnumerableRel.Prefer prefer;
+  private final Map<String, Object> internalParameters =
+      Maps.newLinkedHashMap();
+  private int expansionDepth;
+  private SqlValidator sqlValidator;
+
+  public SamzaQueryPreparingStatement(CalcitePrepare.Context context, CatalogReader catalogReader,
+                                      RelDataTypeFactory typeFactory,
+                                      CalciteSchema schema,
+                                      EnumerableRel.Prefer prefer,
+                                      RelOptPlanner planner,
+                                      Convention resultConvention) {
+    super(context, catalogReader, resultConvention);
+    this.schema = schema;
+    this.prefer = prefer;
+    this.planner = planner;
+    this.typeFactory = typeFactory;
+    this.rexBuilder = new RexBuilder(typeFactory);
+  }
+
+  @Override
+  protected PreparedResult createPreparedExplanation(RelDataType resultType, RelDataType parameterRowType, RelNode rootRel, boolean explainAsXml, SqlExplainLevel detailLevel) {
+    return null;
+  }
+
+  @Override
+  protected PreparedResult implement(RelDataType rowType, RelNode rootRel, SqlKind sqlKind) {
+    return null;
+  }
+
+  @Override
+  protected SqlToRelConverter getSqlToRelConverter(SqlValidator validator, CatalogReader catalogReader) {
+    SqlToRelConverter sqlToRelConverter =
+        new SqlToRelConverter(
+            this, validator, catalogReader, planner, rexBuilder,
+            StandardConvertletTable.INSTANCE);
+    sqlToRelConverter.setTrimUnusedFields(true);
+    return sqlToRelConverter;
+  }
+
+  @Override
+  public RelNode flattenTypes(RelNode rootRel, boolean restructure) {
+    return null;
+  }
+
+  @Override
+  protected RelNode decorrelate(SqlToRelConverter sqlToRelConverter, SqlNode query, RelNode rootRel) {
+    return null;
+  }
+
+  @Override
+  protected void init(Class runtimeContextClass) {}
+
+  @Override
+  protected SqlValidator getSqlValidator() {
+    return null;
+  }
+
+  @Override
+  public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) {
+    // TODO: Implement custom view expansions
+    return super.expandView(rowType, queryString, schemaPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaSqlValidator.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaSqlValidator.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaSqlValidator.java
new file mode 100644
index 0000000..e0f9cd8
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaSqlValidator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+
+/**
+ * Defines a Samza specific SQL validator based on Calcite's SQL validator implementation.
+ */
+public class SamzaSqlValidator extends SqlValidatorImpl{
+  /**
+   * Creates a validator.
+   *
+   * @param opTab         Operator table
+   * @param catalogReader Catalog reader
+   * @param typeFactory   Type factory
+   */
+  protected SamzaSqlValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory) {
+    /* Note: We may need to define Samza specific SqlConformance instance in future. */
+    super(opTab, catalogReader, typeFactory, SqlConformance.DEFAULT);
+  }
+
+  @Override
+  protected RelDataType getLogicalSourceRowType(
+      RelDataType sourceRowType, SqlInsert insert) {
+    return ((JavaTypeFactory) typeFactory).toSql(sourceRowType);
+  }
+
+  @Override
+  protected RelDataType getLogicalTargetRowType(
+      RelDataType targetRowType, SqlInsert insert) {
+    return ((JavaTypeFactory) typeFactory).toSql(targetRowType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java
new file mode 100644
index 0000000..705c0ff
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.schema;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Converts an Avro schema to Calcite RelDataType.
+ * <p></p>
+ * <p>Inspired by parquet-mr.</p>
+ */
+public class AvroSchemaConverter {
+
+  private final RelDataTypeFactory relDataTypeFactory;
+  private final Schema rootSchema;
+
+  public AvroSchemaConverter(RelDataTypeFactory relDataTypeFactory, Schema schema) {
+    this.relDataTypeFactory = relDataTypeFactory;
+    this.rootSchema = schema;
+  }
+
+  public RelDataType convert() {
+    // At the top level only records are supported
+    if (rootSchema.getType() != Schema.Type.RECORD) {
+      throw new RuntimeException(
+          String.format("Type: %s is unsupported at this level; Only Record type of supported at top level!",
+              rootSchema.getType()));
+    }
+
+    return convertRecord(rootSchema, true);
+  }
+
+  private RelDataType convertRecord(Schema recordSchema, boolean isRoot) {
+    RelDataTypeFactory.FieldInfoBuilder builder = relDataTypeFactory.builder();
+
+    for (Schema.Field field : recordSchema.getFields()) {
+      Schema fieldSchema = field.schema();
+      if (fieldSchema.getType() == Schema.Type.NULL) {
+        continue;
+      }
+
+      convertField(builder, field.name(), fieldSchema);
+    }
+
+    RelDataType record = builder.build();
+    if(isRoot) {
+      // Record at root level is treated differently.
+      return record;
+    }
+
+    return relDataTypeFactory.createStructType(record.getFieldList());
+  }
+
+  private void convertField(RelDataTypeFactory.FieldInfoBuilder builder,
+                            String fieldName,
+                            Schema fieldSchema) {
+    builder.add(fieldName, convertFieldType(fieldSchema));
+  }
+
+  private RelDataType convertFieldType(Schema elementType) {
+    Schema.Type type = elementType.getType();
+    if (type == Schema.Type.STRING) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
+    } else if (type == Schema.Type.INT) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.INTEGER);
+    } else if (type == Schema.Type.BOOLEAN) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
+    } else if (type == Schema.Type.BYTES) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.BINARY);
+    } else if (type == Schema.Type.LONG) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.BIGINT);
+    } else if (type == Schema.Type.DOUBLE) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+    } else if (type == Schema.Type.FLOAT) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.FLOAT);
+    } else if (type == Schema.Type.ARRAY) {
+      return relDataTypeFactory.createArrayType(convertFieldType(elementType), -1);
+    } else if (type == Schema.Type.RECORD) {
+      return convertRecord(elementType, false);
+    } else if (type == Schema.Type.MAP) {
+      return relDataTypeFactory.createMapType(relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
+          convertFieldType(elementType.getValueType()));
+    } else if (type == Schema.Type.FIXED) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.VARBINARY, elementType.getFixedSize());
+    } else if (type == Schema.Type.UNION) {
+      List<Schema> types = elementType.getTypes();
+      List<Schema> nonNullTypes = new ArrayList<Schema>();
+      boolean foundNull = false;
+
+      for(Schema s : types) {
+        if(s.getType() == Schema.Type.NULL){
+          foundNull = true;
+        } else {
+          nonNullTypes.add(s);
+        }
+      }
+
+      if(nonNullTypes.size() > 1){
+        throw new RuntimeException("Multiple non null types in a union is not supported.");
+      } else {
+        return relDataTypeFactory.createTypeWithNullability(convertFieldType(nonNullTypes.get(0)), foundNull);
+      }
+    } else if(type == Schema.Type.ENUM) {
+      // TODO: May be there is a better way to handle enums
+      relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
+    }
+
+    return relDataTypeFactory.createSqlType(SqlTypeName.ANY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java
new file mode 100644
index 0000000..fd87aa5
--- /dev/null
+++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.*;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.Map;
+
+public class SamzaStreamTableFactory implements TableFactory<Table> {
+    public Table create(SchemaPlus schema, String name,
+                        Map<String, Object> operand, RelDataType rowType) {
+        final RelProtoDataType protoRowType = new RelProtoDataType() {
+            public RelDataType apply(RelDataTypeFactory a0) {
+                return a0.builder()
+                        .add("id", SqlTypeName.INTEGER)
+                        .add("product", SqlTypeName.VARCHAR, 10)
+                        .add("quantity", SqlTypeName.INTEGER)
+                        .build();
+            }
+        };
+        final ImmutableList<Object[]> rows = ImmutableList.of(
+                new Object[]{1, "paint", 10},
+                new Object[]{2, "paper", 5});
+
+        return new StreamableTable() {
+            public Table stream() {
+                return new OrdersTable(protoRowType, rows);
+            }
+
+            public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+                return protoRowType.apply(typeFactory);
+            }
+
+            public Statistic getStatistic() {
+                return Statistics.UNKNOWN;
+            }
+
+            public Schema.TableType getJdbcTableType() {
+                return Schema.TableType.TABLE;
+            }
+        };
+    }
+
+    public static class OrdersTable implements ScannableTable {
+        private final RelProtoDataType protoRowType;
+        private final ImmutableList<Object[]> rows;
+
+        public OrdersTable(RelProtoDataType protoRowType,
+                           ImmutableList<Object[]> rows) {
+            this.protoRowType = protoRowType;
+            this.rows = rows;
+        }
+
+        public Enumerable<Object[]> scan(DataContext root) {
+            return Linq4j.asEnumerable(rows);
+        }
+
+        public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+            return protoRowType.apply(typeFactory);
+        }
+
+        public Statistic getStatistic() {
+            return Statistics.UNKNOWN;
+        }
+
+        public Schema.TableType getJdbcTableType() {
+            return Schema.TableType.STREAM;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java
new file mode 100644
index 0000000..0bb15b2
--- /dev/null
+++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.util.Util;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestQueryPlanner {
+  public static final String STREAM_SCHEMA = "     {\n"
+      + "       name: 'STREAMS',\n"
+      + "       tables: [ {\n"
+      + "         type: 'custom',\n"
+      + "         name: 'ORDERS',\n"
+      + "         stream: {\n"
+      + "           stream: true\n"
+      + "         },\n"
+      + "         factory: '" + SamzaStreamTableFactory.class.getName() + "'\n"
+      + "       } ]\n"
+      + "     }\n";
+
+  public static final String STREAM_MODEL = "{\n"
+      + "  version: '1.0',\n"
+      + "  defaultSchema: 'STREAMS',\n"
+      + "   schemas: [\n"
+      + STREAM_SCHEMA
+      + "   ]\n"
+      + "}";
+
+  public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED =
+      "LogicalDelta\n" +
+          "  LogicalProject(id=[$0], product=[$1], quantity=[$2])\n" +
+          "    LogicalFilter(condition=[>($2, 5)])\n" +
+          "      EnumerableTableScan(table=[[STREAMS, ORDERS]])";
+  public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE =
+      "select stream * from orders where quantity > 5";
+
+  @Test
+  public void testQueryPlanner() throws IOException, SQLException {
+
+    SamzaCalciteConnection connection = new SamzaCalciteConnection(STREAM_MODEL);
+    CalcitePrepare.Context context = Schemas.makeContext(connection,
+        connection.getCalciteRootSchema(),
+        ImmutableList.of(connection.getSchema()),
+        ImmutableMap.copyOf(defaultConfiguration()));
+
+    QueryPlanner planner = new QueryPlanner();
+    RelNode relNode = planner.getPlan(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE, context);
+    Assert.assertNotNull(relNode);
+    String s = Util.toLinux(RelOptUtil.toString(relNode));
+    Assert.assertTrue(s.contains(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED));
+  }
+
+  public static Map<CalciteConnectionProperty, String> defaultConfiguration(){
+    Map<CalciteConnectionProperty, String> map = new HashMap<CalciteConnectionProperty, String>();
+
+    map.put(CalciteConnectionProperty.CASE_SENSITIVE, "false");
+    map.put(CalciteConnectionProperty.QUOTED_CASING, Casing.UNCHANGED.name());
+    map.put(CalciteConnectionProperty.UNQUOTED_CASING, Casing.UNCHANGED.name());
+    map.put(CalciteConnectionProperty.QUOTING, Quoting.BACK_TICK.name());
+
+    return map;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
new file mode 100644
index 0000000..fbb5c59
--- /dev/null
+++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.schema;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAvroSchemaConverter {
+  public static final String SIMPLE_RECORD_SCHEMA = "{\"namespace\": \"example.avro\",\n" +
+      " \"type\": \"record\",\n" +
+      " \"name\": \"User\",\n" +
+      " \"fields\": [\n" +
+      "     {\"name\": \"name\", \"type\": \"string\"},\n" +
+      "     {\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]},\n" +
+      "     {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
+      " ]\n" +
+      "}";
+
+  public static final Schema simpleRecord = new Schema.Parser().parse(SIMPLE_RECORD_SCHEMA);
+  @Test
+  public void testSimpleAvroRecord(){
+    RelDataTypeFactory relDataTypeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    AvroSchemaConverter schemaConverter = new AvroSchemaConverter(relDataTypeFactory, simpleRecord);
+
+    RelDataType relDataType = schemaConverter.convert();
+
+    Assert.assertEquals(SqlTypeName.VARCHAR, relDataType.getField("name", false, false).getType().getSqlTypeName());
+    Assert.assertEquals(SqlTypeName.INTEGER, relDataType.getField("favorite_number", false, false).getType().getSqlTypeName());
+    Assert.assertTrue(relDataType.getField("favorite_number", false, false).getType().isNullable());
+    Assert.assertEquals(SqlTypeName.VARCHAR, relDataType.getField("favorite_color", false, false).getType().getSqlTypeName());
+    Assert.assertTrue(relDataType.getField("favorite_color", false, false).getType().isNullable());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
deleted file mode 100644
index 3dad046..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.metadata;
-
-import org.apache.avro.Schema;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Converts an Avro schema to Calcite RelDataType.
- * <p></p>
- * <p>Inspired by parquet-mr.</p>
- */
-public class AvroSchemaConverter {
-
-  private final RelDataTypeFactory relDataTypeFactory;
-  private final Schema rootSchema;
-
-  public AvroSchemaConverter(RelDataTypeFactory relDataTypeFactory, Schema schema) {
-    this.relDataTypeFactory = relDataTypeFactory;
-    this.rootSchema = schema;
-  }
-
-  public RelDataType convert() {
-    // At the top level only records are supported
-    if (rootSchema.getType() != Schema.Type.RECORD) {
-      throw new RuntimeException(
-          String.format("Type: %s is unsupported at this level; Only Record type of supported at top level!",
-              rootSchema.getType()));
-    }
-
-    return convertRecord(rootSchema, true);
-  }
-
-  private RelDataType convertRecord(Schema recordSchema, boolean isRoot) {
-    RelDataTypeFactory.FieldInfoBuilder builder = relDataTypeFactory.builder();
-
-    for (Schema.Field field : recordSchema.getFields()) {
-      Schema fieldSchema = field.schema();
-      if (fieldSchema.getType() == Schema.Type.NULL) {
-        continue;
-      }
-
-      convertField(builder, field.name(), fieldSchema);
-    }
-
-    RelDataType record = builder.build();
-    if(isRoot) {
-      // Record at root level is treated differently.
-      return record;
-    }
-
-    return relDataTypeFactory.createStructType(record.getFieldList());
-  }
-
-  private void convertField(RelDataTypeFactory.FieldInfoBuilder builder,
-                            String fieldName,
-                            Schema fieldSchema) {
-    builder.add(fieldName, convertFieldType(fieldSchema));
-  }
-
-  private RelDataType convertFieldType(Schema elementType) {
-    Schema.Type type = elementType.getType();
-    if (type == Schema.Type.STRING) {
-      return relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
-    } else if (type == Schema.Type.INT) {
-      return relDataTypeFactory.createSqlType(SqlTypeName.INTEGER);
-    } else if (type == Schema.Type.BOOLEAN) {
-      return relDataTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
-    } else if (type == Schema.Type.BYTES) {
-      return relDataTypeFactory.createSqlType(SqlTypeName.BINARY);
-    } else if (type == Schema.Type.LONG) {
-      return relDataTypeFactory.createSqlType(SqlTypeName.BIGINT);
-    } else if (type == Schema.Type.DOUBLE) {
-      return relDataTypeFactory.createSqlType(SqlTypeName.DOUBLE);
-    } else if (type == Schema.Type.FLOAT) {
-      return relDataTypeFactory.createSqlType(SqlTypeName.FLOAT);
-    } else if (type == Schema.Type.ARRAY) {
-      return relDataTypeFactory.createArrayType(convertFieldType(elementType), -1);
-    } else if (type == Schema.Type.RECORD) {
-      return convertRecord(elementType, false);
-    } else if (type == Schema.Type.MAP) {
-      return relDataTypeFactory.createMapType(relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
-          convertFieldType(elementType.getValueType()));
-    } else if (type == Schema.Type.FIXED) {
-      return relDataTypeFactory.createSqlType(SqlTypeName.VARBINARY, elementType.getFixedSize());
-    } else if (type == Schema.Type.UNION) {
-      List<Schema> types = elementType.getTypes();
-      List<Schema> nonNullTypes = new ArrayList<Schema>();
-      boolean foundNull = false;
-
-      for(Schema s : types) {
-        if(s.getType() == Schema.Type.NULL){
-          foundNull = true;
-        } else {
-          nonNullTypes.add(s);
-        }
-      }
-
-      if(nonNullTypes.size() > 1){
-        throw new RuntimeException("Multiple non null types in a union is not supported.");
-      } else {
-        return relDataTypeFactory.createTypeWithNullability(convertFieldType(nonNullTypes.get(0)), foundNull);
-      }
-    } else if(type == Schema.Type.ENUM) {
-      // TODO: May be there is a better way to handle enums
-      relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
-    }
-
-    return relDataTypeFactory.createSqlType(SqlTypeName.ANY);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
deleted file mode 100644
index 1dfb262..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.adapter.enumerable.EnumerableRel;
-import org.apache.calcite.adapter.enumerable.EnumerableRules;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.avatica.util.Casing;
-import org.apache.calcite.avatica.util.Quoting;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.jdbc.CalcitePrepare;
-import org.apache.calcite.plan.*;
-import org.apache.calcite.plan.volcano.VolcanoPlanner;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.rules.*;
-import org.apache.calcite.rel.stream.StreamRules;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.sql.validate.SqlValidator;
-
-import java.util.List;
-
-/**
- * Streaming query planner implementation based on Calcite.
- */
-public class QueryPlanner {
-  public static final boolean COMMUTE =
-      "true".equals(
-          System.getProperties().getProperty("calcite.enable.join.commute"));
-
-  /**
-   * Whether to enable the collation trait. Some extra optimizations are
-   * possible if enabled, but queries should work either way. At some point
-   * this will become a preference, or we will run multiple phases: first
-   * disabled, then enabled.
-   */
-  private static final boolean ENABLE_COLLATION_TRAIT = true;
-
-  private static final List<RelOptRule> DEFAULT_RULES =
-      ImmutableList.of(
-          AggregateStarTableRule.INSTANCE,
-          AggregateStarTableRule.INSTANCE2,
-          TableScanRule.INSTANCE,
-          COMMUTE
-              ? JoinAssociateRule.INSTANCE
-              : ProjectMergeRule.INSTANCE,
-          FilterTableScanRule.INSTANCE,
-          ProjectFilterTransposeRule.INSTANCE,
-          FilterProjectTransposeRule.INSTANCE,
-          FilterJoinRule.FILTER_ON_JOIN,
-          AggregateExpandDistinctAggregatesRule.INSTANCE,
-          AggregateReduceFunctionsRule.INSTANCE,
-          FilterAggregateTransposeRule.INSTANCE,
-          JoinCommuteRule.INSTANCE,
-          JoinPushThroughJoinRule.RIGHT,
-          JoinPushThroughJoinRule.LEFT,
-          SortProjectTransposeRule.INSTANCE);
-
-  private static final List<RelOptRule> ENUMERABLE_RULES =
-      ImmutableList.of(
-          EnumerableRules.ENUMERABLE_JOIN_RULE,
-          EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE,
-          EnumerableRules.ENUMERABLE_CORRELATE_RULE,
-          EnumerableRules.ENUMERABLE_PROJECT_RULE,
-          EnumerableRules.ENUMERABLE_FILTER_RULE,
-          EnumerableRules.ENUMERABLE_AGGREGATE_RULE,
-          EnumerableRules.ENUMERABLE_SORT_RULE,
-          EnumerableRules.ENUMERABLE_LIMIT_RULE,
-          EnumerableRules.ENUMERABLE_COLLECT_RULE,
-          EnumerableRules.ENUMERABLE_UNCOLLECT_RULE,
-          EnumerableRules.ENUMERABLE_UNION_RULE,
-          EnumerableRules.ENUMERABLE_INTERSECT_RULE,
-          EnumerableRules.ENUMERABLE_MINUS_RULE,
-          EnumerableRules.ENUMERABLE_TABLE_MODIFICATION_RULE,
-          EnumerableRules.ENUMERABLE_VALUES_RULE,
-          EnumerableRules.ENUMERABLE_WINDOW_RULE,
-          EnumerableRules.ENUMERABLE_TABLE_FUNCTION_SCAN_RULE);
-
-  private static final List<RelOptRule> CONSTANT_REDUCTION_RULES =
-      ImmutableList.of(
-          ReduceExpressionsRule.PROJECT_INSTANCE,
-          ReduceExpressionsRule.FILTER_INSTANCE,
-          ReduceExpressionsRule.CALC_INSTANCE,
-          ReduceExpressionsRule.JOIN_INSTANCE,
-          ValuesReduceRule.FILTER_INSTANCE,
-          ValuesReduceRule.PROJECT_FILTER_INSTANCE,
-          ValuesReduceRule.PROJECT_INSTANCE);
-
-  /**
-   * Transform streaming query to a query plan.
-   * @param query streaming query in SQL with streaming extensions
-   * @param context query prepare context
-   * @return query plan
-   */
-  public RelNode getPlan(String query, CalcitePrepare.Context context) {
-    final JavaTypeFactory typeFactory = context.getTypeFactory();
-    final CalciteConnectionConfig config = context.config();
-
-    CalciteCatalogReader catalogReader = new CalciteCatalogReader(context.getRootSchema(),
-        false,
-        context.getDefaultSchemaPath(),
-        typeFactory);
-
-    SqlParser sqlParser = SqlParser.create(query,
-        SqlParser.configBuilder()
-            .setQuotedCasing(config.quotedCasing())
-            .setUnquotedCasing(config.unquotedCasing())
-            .setQuoting(config.quoting())
-            .build());
-
-    SqlNode sqlNode;
-
-    try {
-      sqlNode = sqlParser.parseStmt();
-    } catch (SqlParseException e) {
-      throw new RuntimeException("parse failed: " + e.getMessage(), e);
-    }
-
-    final ChainedSqlOperatorTable operatorTable =
-        new ChainedSqlOperatorTable(
-            ImmutableList.of(SqlStdOperatorTable.instance(), catalogReader));
-
-    final SqlValidator validator =
-        new SamzaSqlValidator(operatorTable, catalogReader, typeFactory);
-    validator.setIdentifierExpansion(true);
-
-    SqlNode validatedSqlNode = validator.validate(sqlNode);
-
-    final RelOptPlanner planner = createStreamingRelOptPlanner(context, null, null);
-
-    final SamzaQueryPreparingStatement preparingStmt =
-        new SamzaQueryPreparingStatement(
-            context,
-            catalogReader,
-            typeFactory,
-            context.getRootSchema(),
-            EnumerableRel.Prefer.ARRAY,
-            planner,
-            EnumerableConvention.INSTANCE);
-
-    /* TODO: Add query optimization. */
-
-    return preparingStmt.getSqlToRelConverter(validator, catalogReader).convertQuery(validatedSqlNode, false, true);
-  }
-
-  /**
-   * Creates a query planner and initializes it with a default set of
-   * rules.
-   *
-   * @param prepareContext  context for preparing a statement
-   * @param externalContext external query planning context
-   * @param costFactory     cost factory for cost based query planning
-   * @return relation query planner instance
-   */
-  protected RelOptPlanner createStreamingRelOptPlanner(final CalcitePrepare.Context prepareContext,
-                                                       org.apache.calcite.plan.Context externalContext,
-                                                       RelOptCostFactory costFactory) {
-    if (externalContext == null) {
-      externalContext = Contexts.withConfig(prepareContext.config());
-    }
-
-    final VolcanoPlanner planner =
-        new VolcanoPlanner(costFactory, externalContext);
-
-    planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
-
-    if (ENABLE_COLLATION_TRAIT) {
-      planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
-      planner.registerAbstractRelationalRules();
-    }
-    RelOptUtil.registerAbstractRels(planner);
-    for (RelOptRule rule : DEFAULT_RULES) {
-      planner.addRule(rule);
-    }
-
-    /* Note: Bindable rules were removed until Calcite switches the convention of the root node to bindable. */
-
-    for (RelOptRule rule : ENUMERABLE_RULES) {
-      planner.addRule(rule);
-    }
-
-    for (RelOptRule rule : StreamRules.RULES) {
-      planner.addRule(rule);
-    }
-
-    /* Note: Constant reduction rules were removed because current Calcite implementation doesn't use them. */
-
-    return planner;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java
deleted file mode 100644
index 63b1da5..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.config.CalciteConnectionConfigImpl;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.jdbc.CalciteRootSchema;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.model.ModelHandler;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.sql.*;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-/**
- * Minimal <code>org.apache.calcite.jdbc.CalciteConnection</code> implementation which enables
- * re-use of Calcite code.
- */
-public class SamzaCalciteConnection implements CalciteConnection {
-  private static final String INLINE = "inline:";
-  private final JavaTypeFactory typeFactory;
-  private final CalciteRootSchema rootSchema;
-  private String schema;
-
-  public SamzaCalciteConnection(String model) throws IOException {
-    typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
-    rootSchema = CalciteSchema.createRootSchema(true);
-    new ModelHandler(this, INLINE + model);
-  }
-
-  public CalciteRootSchema getCalciteRootSchema(){
-    return rootSchema;
-  }
-
-  @Override
-  public SchemaPlus getRootSchema() {
-    return rootSchema.plus();
-  }
-
-  @Override
-  public JavaTypeFactory getTypeFactory() {
-    return typeFactory;
-  }
-
-  @Override
-  public Properties getProperties() {
-    return null;
-  }
-
-  @Override
-  public Statement createStatement() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public CallableStatement prepareCall(String sql) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public String nativeSQL(String sql) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public void setAutoCommit(boolean autoCommit) throws SQLException {
-
-  }
-
-  @Override
-  public boolean getAutoCommit() throws SQLException {
-    return false;
-  }
-
-  @Override
-  public void commit() throws SQLException {
-
-  }
-
-  @Override
-  public void rollback() throws SQLException {
-
-  }
-
-  @Override
-  public void close() throws SQLException {
-
-  }
-
-  @Override
-  public boolean isClosed() throws SQLException {
-    return false;
-  }
-
-  @Override
-  public DatabaseMetaData getMetaData() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public void setReadOnly(boolean readOnly) throws SQLException {
-
-  }
-
-  @Override
-  public boolean isReadOnly() throws SQLException {
-    return false;
-  }
-
-  @Override
-  public void setCatalog(String catalog) throws SQLException {
-
-  }
-
-  @Override
-  public String getCatalog() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public void setTransactionIsolation(int level) throws SQLException {
-
-  }
-
-  @Override
-  public int getTransactionIsolation() throws SQLException {
-    return 0;
-  }
-
-  @Override
-  public SQLWarning getWarnings() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public void clearWarnings() throws SQLException {
-
-  }
-
-  @Override
-  public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public Map<String, Class<?>> getTypeMap() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
-
-  }
-
-  @Override
-  public void setHoldability(int holdability) throws SQLException {
-
-  }
-
-  @Override
-  public int getHoldability() throws SQLException {
-    return 0;
-  }
-
-  @Override
-  public Savepoint setSavepoint() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public Savepoint setSavepoint(String name) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public void rollback(Savepoint savepoint) throws SQLException {
-
-  }
-
-  @Override
-  public void releaseSavepoint(Savepoint savepoint) throws SQLException {
-
-  }
-
-  @Override
-  public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public Clob createClob() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public Blob createBlob() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public NClob createNClob() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public SQLXML createSQLXML() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public boolean isValid(int timeout) throws SQLException {
-    return false;
-  }
-
-  @Override
-  public void setClientInfo(String name, String value) throws SQLClientInfoException {
-
-  }
-
-  @Override
-  public void setClientInfo(Properties properties) throws SQLClientInfoException {
-
-  }
-
-  @Override
-  public String getClientInfo(String name) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public Properties getClientInfo() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public void setSchema(String schema) throws SQLException {
-    this.schema = schema;
-  }
-
-  @Override
-  public String getSchema() throws SQLException {
-    return schema;
-  }
-
-  public void abort(Executor executor) throws SQLException {
-
-  }
-
-  public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
-
-  }
-
-  public int getNetworkTimeout() throws SQLException {
-    return 0;
-  }
-
-  @Override
-  public CalciteConnectionConfig config() {
-    return new CalciteConnectionConfigImpl(new Properties());
-  }
-
-  @Override
-  public <T> Queryable<T> createQuery(Expression expression, Class<T> rowType) {
-    return null;
-  }
-
-  @Override
-  public <T> Queryable<T> createQuery(Expression expression, Type rowType) {
-    return null;
-  }
-
-  @Override
-  public <T> T execute(Expression expression, Class<T> type) {
-    return null;
-  }
-
-  @Override
-  public <T> T execute(Expression expression, Type type) {
-    return null;
-  }
-
-  @Override
-  public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
-    return null;
-  }
-
-  @Override
-  public <T> T unwrap(Class<T> iface) throws SQLException {
-    return null;
-  }
-
-  @Override
-  public boolean isWrapperFor(Class<?> iface) throws SQLException {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java
deleted file mode 100644
index 0721573..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import com.google.common.collect.Maps;
-import org.apache.calcite.adapter.enumerable.EnumerableRel;
-import org.apache.calcite.jdbc.CalcitePrepare;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql2rel.SqlToRelConverter;
-import org.apache.calcite.sql2rel.StandardConvertletTable;
-
-import java.util.List;
-import java.util.Map;
-
-public class SamzaQueryPreparingStatement extends Prepare implements RelOptTable.ViewExpander {
-  private final RelOptPlanner planner;
-  private final RexBuilder rexBuilder;
-  protected final CalciteSchema schema;
-  protected final RelDataTypeFactory typeFactory;
-  private final EnumerableRel.Prefer prefer;
-  private final Map<String, Object> internalParameters =
-      Maps.newLinkedHashMap();
-  private int expansionDepth;
-  private SqlValidator sqlValidator;
-
-  public SamzaQueryPreparingStatement(CalcitePrepare.Context context, CatalogReader catalogReader,
-                                      RelDataTypeFactory typeFactory,
-                                      CalciteSchema schema,
-                                      EnumerableRel.Prefer prefer,
-                                      RelOptPlanner planner,
-                                      Convention resultConvention) {
-    super(context, catalogReader, resultConvention);
-    this.schema = schema;
-    this.prefer = prefer;
-    this.planner = planner;
-    this.typeFactory = typeFactory;
-    this.rexBuilder = new RexBuilder(typeFactory);
-  }
-
-  @Override
-  protected PreparedResult createPreparedExplanation(RelDataType resultType, RelDataType parameterRowType, RelNode rootRel, boolean explainAsXml, SqlExplainLevel detailLevel) {
-    return null;
-  }
-
-  @Override
-  protected PreparedResult implement(RelDataType rowType, RelNode rootRel, SqlKind sqlKind) {
-    return null;
-  }
-
-  @Override
-  protected SqlToRelConverter getSqlToRelConverter(SqlValidator validator, CatalogReader catalogReader) {
-    SqlToRelConverter sqlToRelConverter =
-        new SqlToRelConverter(
-            this, validator, catalogReader, planner, rexBuilder,
-            StandardConvertletTable.INSTANCE);
-    sqlToRelConverter.setTrimUnusedFields(true);
-    return sqlToRelConverter;
-  }
-
-  @Override
-  public RelNode flattenTypes(RelNode rootRel, boolean restructure) {
-    return null;
-  }
-
-  @Override
-  protected RelNode decorrelate(SqlToRelConverter sqlToRelConverter, SqlNode query, RelNode rootRel) {
-    return null;
-  }
-
-  @Override
-  protected void init(Class runtimeContextClass) {}
-
-  @Override
-  protected SqlValidator getSqlValidator() {
-    return null;
-  }
-
-  @Override
-  public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) {
-    // TODO: Implement custom view expansions
-    return super.expandView(rowType, queryString, schemaPath);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
deleted file mode 100644
index f46c1f0..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.validate.SqlConformance;
-import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
-import org.apache.calcite.sql.validate.SqlValidatorImpl;
-
-public class SamzaSqlValidator extends SqlValidatorImpl{
-  /**
-   * Creates a validator.
-   *
-   * @param opTab         Operator table
-   * @param catalogReader Catalog reader
-   * @param typeFactory   Type factory
-   */
-  protected SamzaSqlValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory) {
-    /* Note: We may need to define Samza specific SqlConformance instance in future. */
-    super(opTab, catalogReader, typeFactory, SqlConformance.DEFAULT);
-  }
-
-  @Override
-  protected RelDataType getLogicalSourceRowType(
-      RelDataType sourceRowType, SqlInsert insert) {
-    return ((JavaTypeFactory) typeFactory).toSql(sourceRowType);
-  }
-
-  @Override
-  protected RelDataType getLogicalTargetRowType(
-      RelDataType targetRowType, SqlInsert insert) {
-    return ((JavaTypeFactory) typeFactory).toSql(targetRowType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java b/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java
deleted file mode 100644
index 022116e..0000000
--- a/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.calcite.avatica.util.Casing;
-import org.apache.calcite.avatica.util.Quoting;
-import org.apache.calcite.config.CalciteConnectionProperty;
-import org.apache.calcite.jdbc.CalcitePrepare;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.schema.Schemas;
-import org.apache.calcite.util.Util;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class QueryPlannerTest {
-  public static final String STREAM_SCHEMA = "     {\n"
-      + "       name: 'STREAMS',\n"
-      + "       tables: [ {\n"
-      + "         type: 'custom',\n"
-      + "         name: 'ORDERS',\n"
-      + "         stream: {\n"
-      + "           stream: true\n"
-      + "         },\n"
-      + "         factory: '" + SamzaStreamTableFactory.class.getName() + "'\n"
-      + "       } ]\n"
-      + "     }\n";
-
-  public static final String STREAM_MODEL = "{\n"
-      + "  version: '1.0',\n"
-      + "  defaultSchema: 'STREAMS',\n"
-      + "   schemas: [\n"
-      + STREAM_SCHEMA
-      + "   ]\n"
-      + "}";
-
-  public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED =
-      "LogicalDelta\n" +
-          "  LogicalProject(id=[$0], product=[$1], quantity=[$2])\n" +
-          "    LogicalFilter(condition=[>($2, 5)])\n" +
-          "      EnumerableTableScan(table=[[STREAMS, ORDERS]])";
-  public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE =
-      "select stream * from orders where quantity > 5";
-
-  @Test
-  public void testQueryPlanner() throws IOException, SQLException {
-
-    SamzaCalciteConnection connection = new SamzaCalciteConnection(STREAM_MODEL);
-    CalcitePrepare.Context context = Schemas.makeContext(connection,
-        connection.getCalciteRootSchema(),
-        ImmutableList.of(connection.getSchema()),
-        ImmutableMap.copyOf(defaultConfiguration()));
-
-    QueryPlanner planner = new QueryPlanner();
-    RelNode relNode = planner.getPlan(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE, context);
-    Assert.assertNotNull(relNode);
-    String s = Util.toLinux(RelOptUtil.toString(relNode));
-    Assert.assertTrue(s.contains(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED));
-  }
-
-  public static Map<CalciteConnectionProperty, String> defaultConfiguration(){
-    Map<CalciteConnectionProperty, String> map = new HashMap<CalciteConnectionProperty, String>();
-
-    map.put(CalciteConnectionProperty.CASE_SENSITIVE, "false");
-    map.put(CalciteConnectionProperty.QUOTED_CASING, Casing.UNCHANGED.name());
-    map.put(CalciteConnectionProperty.UNQUOTED_CASING, Casing.UNCHANGED.name());
-    map.put(CalciteConnectionProperty.QUOTING, Quoting.BACK_TICK.name());
-
-    return map;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java b/samza-sql-core/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java
deleted file mode 100644
index f757d8f..0000000
--- a/samza-sql-core/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Linq4j;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.*;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.Map;
-
-public class SamzaStreamTableFactory implements TableFactory<Table> {
-    public Table create(SchemaPlus schema, String name,
-                        Map<String, Object> operand, RelDataType rowType) {
-        final RelProtoDataType protoRowType = new RelProtoDataType() {
-            public RelDataType apply(RelDataTypeFactory a0) {
-                return a0.builder()
-                        .add("id", SqlTypeName.INTEGER)
-                        .add("product", SqlTypeName.VARCHAR, 10)
-                        .add("quantity", SqlTypeName.INTEGER)
-                        .build();
-            }
-        };
-        final ImmutableList<Object[]> rows = ImmutableList.of(
-                new Object[]{1, "paint", 10},
-                new Object[]{2, "paper", 5});
-
-        return new StreamableTable() {
-            public Table stream() {
-                return new OrdersTable(protoRowType, rows);
-            }
-
-            public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-                return protoRowType.apply(typeFactory);
-            }
-
-            public Statistic getStatistic() {
-                return Statistics.UNKNOWN;
-            }
-
-            public Schema.TableType getJdbcTableType() {
-                return Schema.TableType.TABLE;
-            }
-        };
-    }
-
-    public static class OrdersTable implements ScannableTable {
-        private final RelProtoDataType protoRowType;
-        private final ImmutableList<Object[]> rows;
-
-        public OrdersTable(RelProtoDataType protoRowType,
-                           ImmutableList<Object[]> rows) {
-            this.protoRowType = protoRowType;
-            this.rows = rows;
-        }
-
-        public Enumerable<Object[]> scan(DataContext root) {
-            return Linq4j.asEnumerable(rows);
-        }
-
-        public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-            return protoRowType.apply(typeFactory);
-        }
-
-        public Statistic getStatistic() {
-            return Statistics.UNKNOWN;
-        }
-
-        public Schema.TableType getJdbcTableType() {
-            return Schema.TableType.STREAM;
-        }
-    }
-}