You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/15 18:41:59 UTC

[4/5] beam git commit: [BEAM-2740] Hide BeamSqlEnv.

[BEAM-2740] Hide BeamSqlEnv.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/49aad927
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/49aad927
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/49aad927

Branch: refs/heads/DSL_SQL
Commit: 49aad927d4d9cf58c30c04641c766a62d44f44b7
Parents: 9eec6a0
Author: James Xu <xu...@gmail.com>
Authored: Wed Aug 9 18:54:54 2017 +0800
Committer: Tyler Akidau <ta...@apache.org>
Committed: Tue Aug 15 11:40:39 2017 -0700

----------------------------------------------------------------------
 .../sdk/extensions/sql/BeamRecordSqlType.java   | 185 ++++++++
 .../apache/beam/sdk/extensions/sql/BeamSql.java | 113 ++---
 .../beam/sdk/extensions/sql/BeamSqlCli.java     |  65 ---
 .../beam/sdk/extensions/sql/BeamSqlEnv.java     | 127 ------
 .../sdk/extensions/sql/BeamSqlRecordHelper.java | 217 +++++++++
 .../beam/sdk/extensions/sql/BeamSqlUdf.java     |  41 ++
 .../extensions/sql/example/BeamSqlExample.java  |   2 +-
 .../sdk/extensions/sql/impl/BeamSqlCli.java     |  65 +++
 .../sdk/extensions/sql/impl/BeamSqlEnv.java     | 135 ++++++
 .../sdk/extensions/sql/impl/package-info.java   |  22 +
 .../sql/impl/planner/BeamQueryPlanner.java      |   9 +-
 .../sql/impl/rel/BeamAggregationRel.java        |   4 +-
 .../extensions/sql/impl/rel/BeamFilterRel.java  |   2 +-
 .../extensions/sql/impl/rel/BeamIOSinkRel.java  |   6 +-
 .../sql/impl/rel/BeamIOSourceRel.java           |   6 +-
 .../sql/impl/rel/BeamIntersectRel.java          |   2 +-
 .../extensions/sql/impl/rel/BeamJoinRel.java    |   4 +-
 .../extensions/sql/impl/rel/BeamMinusRel.java   |   2 +-
 .../extensions/sql/impl/rel/BeamProjectRel.java |   2 +-
 .../extensions/sql/impl/rel/BeamRelNode.java    |   5 +-
 .../sql/impl/rel/BeamSetOperatorRelBase.java    |   2 +-
 .../extensions/sql/impl/rel/BeamSortRel.java    |   4 +-
 .../extensions/sql/impl/rel/BeamUnionRel.java   |   2 +-
 .../extensions/sql/impl/rel/BeamValuesRel.java  |   6 +-
 .../sql/impl/schema/BaseBeamTable.java          |  35 ++
 .../extensions/sql/impl/schema/BeamIOType.java  |  28 ++
 .../sql/impl/schema/BeamPCollectionTable.java   |  63 +++
 .../sql/impl/schema/BeamSqlTable.java           |  54 +++
 .../sql/impl/schema/BeamTableUtils.java         | 118 +++++
 .../impl/schema/kafka/BeamKafkaCSVTable.java    | 109 +++++
 .../sql/impl/schema/kafka/BeamKafkaTable.java   | 109 +++++
 .../sql/impl/schema/kafka/package-info.java     |  22 +
 .../sql/impl/schema/package-info.java           |  22 +
 .../sql/impl/schema/text/BeamTextCSVTable.java  |  70 +++
 .../schema/text/BeamTextCSVTableIOReader.java   |  58 +++
 .../schema/text/BeamTextCSVTableIOWriter.java   |  58 +++
 .../sql/impl/schema/text/BeamTextTable.java     |  41 ++
 .../sql/impl/schema/text/package-info.java      |  22 +
 .../transform/BeamAggregationTransforms.java    |   4 +-
 .../sql/impl/transform/BeamJoinTransforms.java  |   4 +-
 .../sql/impl/transform/BeamSqlProjectFn.java    |   4 +-
 .../extensions/sql/impl/utils/CalciteUtils.java |   2 +-
 .../extensions/sql/schema/BaseBeamTable.java    |  34 --
 .../sdk/extensions/sql/schema/BeamIOType.java   |  28 --
 .../sql/schema/BeamPCollectionTable.java        |  62 ---
 .../sql/schema/BeamRecordSqlType.java           | 185 --------
 .../sql/schema/BeamSqlRecordHelper.java         | 217 ---------
 .../sdk/extensions/sql/schema/BeamSqlTable.java |  53 ---
 .../sdk/extensions/sql/schema/BeamSqlUdf.java   |  41 --
 .../extensions/sql/schema/BeamTableUtils.java   | 117 -----
 .../sql/schema/kafka/BeamKafkaCSVTable.java     | 109 -----
 .../sql/schema/kafka/BeamKafkaTable.java        | 109 -----
 .../sql/schema/kafka/package-info.java          |  22 -
 .../sdk/extensions/sql/schema/package-info.java |  22 -
 .../sql/schema/text/BeamTextCSVTable.java       |  70 ---
 .../schema/text/BeamTextCSVTableIOReader.java   |  58 ---
 .../schema/text/BeamTextCSVTableIOWriter.java   |  58 ---
 .../sql/schema/text/BeamTextTable.java          |  41 --
 .../sql/schema/text/package-info.java           |  22 -
 .../extensions/sql/BeamSqlApiSurfaceTest.java   |  12 +-
 .../sql/BeamSqlDslAggregationTest.java          |   1 -
 .../beam/sdk/extensions/sql/BeamSqlDslBase.java |   1 -
 .../sdk/extensions/sql/BeamSqlDslJoinTest.java  |   1 -
 .../extensions/sql/BeamSqlDslProjectTest.java   |   1 -
 .../extensions/sql/BeamSqlDslUdfUdafTest.java   |   2 -
 .../beam/sdk/extensions/sql/TestUtils.java      |   1 -
 .../interpreter/BeamSqlFnExecutorTestBase.java  |   2 +-
 .../extensions/sql/impl/rel/BaseRelTest.java    |  34 ++
 .../sql/impl/rel/BeamIntersectRelTest.java      |   9 +-
 .../rel/BeamJoinRelBoundedVsBoundedTest.java    |  23 +-
 .../rel/BeamJoinRelUnboundedVsBoundedTest.java  |  25 +-
 .../BeamJoinRelUnboundedVsUnboundedTest.java    |  19 +-
 .../sql/impl/rel/BeamMinusRelTest.java          |   9 +-
 .../impl/rel/BeamSetOperatorRelBaseTest.java    |   9 +-
 .../sql/impl/rel/BeamSortRelTest.java           |  17 +-
 .../sql/impl/rel/BeamUnionRelTest.java          |   9 +-
 .../sql/impl/rel/BeamValuesRelTest.java         |  11 +-
 .../sql/impl/schema/BeamSqlRowCoderTest.java    |  77 ++++
 .../schema/kafka/BeamKafkaCSVTableTest.java     | 107 +++++
 .../impl/schema/text/BeamTextCSVTableTest.java  | 176 +++++++
 .../transform/BeamAggregationTransformTest.java | 453 +++++++++++++++++++
 .../schema/transform/BeamTransformBaseTest.java |  97 ++++
 ...mSqlBuiltinFunctionsIntegrationTestBase.java |   2 +-
 ...amSqlComparisonOperatorsIntegrationTest.java |   2 +-
 .../extensions/sql/mock/MockedBoundedTable.java |   4 +-
 .../sdk/extensions/sql/mock/MockedTable.java    |   4 +-
 .../sql/mock/MockedUnboundedTable.java          |   4 +-
 .../sql/schema/BeamSqlRowCoderTest.java         |  76 ----
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 107 -----
 .../sql/schema/text/BeamTextCSVTableTest.java   | 176 -------
 .../transform/BeamAggregationTransformTest.java | 453 -------------------
 .../schema/transform/BeamTransformBaseTest.java |  97 ----
 92 files changed, 2575 insertions(+), 2545 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
new file mode 100644
index 0000000..5269867
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.BooleanCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.DateCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.DoubleCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.FloatCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.ShortCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.TimeCoder;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.BeamRecordType;
+
+/**
+ * Type provider for {@link BeamRecord} with SQL types.
+ *
+ * <p>Limited SQL types are supported now, visit
+ * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
+ * for more details.
+ *
+ */
+public class BeamRecordSqlType extends BeamRecordType {
+  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
+  static {
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+  }
+
+  public List<Integer> fieldTypes;
+
+  protected BeamRecordSqlType(List<String> fieldsName, List<Coder> fieldsCoder) {
+    super(fieldsName, fieldsCoder);
+  }
+
+  private BeamRecordSqlType(List<String> fieldsName, List<Integer> fieldTypes
+      , List<Coder> fieldsCoder) {
+    super(fieldsName, fieldsCoder);
+    this.fieldTypes = fieldTypes;
+  }
+
+  public static BeamRecordSqlType create(List<String> fieldNames,
+      List<Integer> fieldTypes) {
+    if (fieldNames.size() != fieldTypes.size()) {
+      throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
+    }
+    List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
+    for (int idx = 0; idx < fieldTypes.size(); ++idx) {
+      switch (fieldTypes.get(idx)) {
+      case Types.INTEGER:
+        fieldCoders.add(BigEndianIntegerCoder.of());
+        break;
+      case Types.SMALLINT:
+        fieldCoders.add(ShortCoder.of());
+        break;
+      case Types.TINYINT:
+        fieldCoders.add(ByteCoder.of());
+        break;
+      case Types.DOUBLE:
+        fieldCoders.add(DoubleCoder.of());
+        break;
+      case Types.FLOAT:
+        fieldCoders.add(FloatCoder.of());
+        break;
+      case Types.DECIMAL:
+        fieldCoders.add(BigDecimalCoder.of());
+        break;
+      case Types.BIGINT:
+        fieldCoders.add(BigEndianLongCoder.of());
+        break;
+      case Types.VARCHAR:
+      case Types.CHAR:
+        fieldCoders.add(StringUtf8Coder.of());
+        break;
+      case Types.TIME:
+        fieldCoders.add(TimeCoder.of());
+        break;
+      case Types.DATE:
+      case Types.TIMESTAMP:
+        fieldCoders.add(DateCoder.of());
+        break;
+      case Types.BOOLEAN:
+        fieldCoders.add(BooleanCoder.of());
+        break;
+
+      default:
+        throw new UnsupportedOperationException(
+            "Data type: " + fieldTypes.get(idx) + " not supported yet!");
+      }
+    }
+    return new BeamRecordSqlType(fieldNames, fieldTypes, fieldCoders);
+  }
+
+  @Override
+  public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
+    if (null == fieldValue) {// no need to do type check for NULL value
+      return;
+    }
+
+    int fieldType = fieldTypes.get(index);
+    Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
+    if (javaClazz == null) {
+      throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
+    }
+
+    if (!fieldValue.getClass().equals(javaClazz)) {
+      throw new IllegalArgumentException(
+          String.format("[%s](%s) doesn't match type [%s]",
+              fieldValue, fieldValue.getClass(), fieldType)
+      );
+    }
+  }
+
+  public List<Integer> getFieldTypes() {
+    return fieldTypes;
+  }
+
+  public Integer getFieldTypeByIndex(int index){
+    return fieldTypes.get(index);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj != null && obj instanceof BeamRecordSqlType) {
+      BeamRecordSqlType ins = (BeamRecordSqlType) obj;
+      return fieldTypes.equals(ins.getFieldTypes()) && getFieldNames().equals(ins.getFieldNames());
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return 31 * getFieldNames().hashCode() + getFieldTypes().hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "BeamRecordSqlType [fieldNames=" + getFieldNames()
+        + ", fieldTypes=" + fieldTypes + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index bf6a9c0..34355fb 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -17,13 +17,11 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
-import com.google.auto.value.AutoValue;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -94,10 +92,7 @@ public class BeamSql {
    * </ul>
    */
   public static QueryTransform query(String sqlQuery) {
-    return QueryTransform.builder()
-        .setSqlEnv(new BeamSqlEnv())
-        .setSqlQuery(sqlQuery)
-        .build();
+    return new QueryTransform(sqlQuery);
   }
 
   /**
@@ -109,10 +104,7 @@ public class BeamSql {
    * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>.
    */
   public static SimpleQueryTransform simpleQuery(String sqlQuery) {
-    return SimpleQueryTransform.builder()
-        .setSqlEnv(new BeamSqlEnv())
-        .setSqlQuery(sqlQuery)
-        .build();
+    return new SimpleQueryTransform(sqlQuery);
   }
 
   /**
@@ -121,28 +113,22 @@ public class BeamSql {
    * <p>The table names in the input {@code PCollectionTuple} are only valid during the current
    * query.
    */
-  @AutoValue
-  public abstract static class QueryTransform extends
+  public static class QueryTransform extends
       PTransform<PCollectionTuple, PCollection<BeamRecord>> {
-    abstract BeamSqlEnv getSqlEnv();
-    abstract String getSqlQuery();
+    private BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+    private String sqlQuery;
 
-    static Builder builder() {
-      return new AutoValue_BeamSql_QueryTransform.Builder();
-    }
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setSqlQuery(String sqlQuery);
-      abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
-      abstract QueryTransform build();
+    public QueryTransform(String sqlQuery) {
+      this.sqlQuery = sqlQuery;
     }
 
     /**
      * register a UDF function used in this query.
+     *
+     * <p>Refer to {@link BeamSqlUdf} for more about how to implement a UDF in BeamSql.
      */
      public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
-       getSqlEnv().registerUdf(functionName, clazz);
+       beamSqlEnv.registerUdf(functionName, clazz);
        return this;
      }
      /**
@@ -150,7 +136,7 @@ public class BeamSql {
       * Note, {@link SerializableFunction} must have a constructor without arguments.
       */
       public QueryTransform withUdf(String functionName, SerializableFunction sfn){
-        getSqlEnv().registerUdf(functionName, sfn);
+        beamSqlEnv.registerUdf(functionName, sfn);
         return this;
       }
 
@@ -158,7 +144,7 @@ public class BeamSql {
       * register a {@link CombineFn} as UDAF function used in this query.
       */
      public QueryTransform withUdaf(String functionName, CombineFn combineFn){
-       getSqlEnv().registerUdaf(functionName, combineFn);
+       beamSqlEnv.registerUdaf(functionName, combineFn);
        return this;
      }
 
@@ -168,13 +154,13 @@ public class BeamSql {
 
       BeamRelNode beamRelNode = null;
       try {
-        beamRelNode = getSqlEnv().planner.convertToBeamRel(getSqlQuery());
+        beamRelNode = beamSqlEnv.getPlanner().convertToBeamRel(sqlQuery);
       } catch (ValidationException | RelConversionException | SqlParseException e) {
         throw new IllegalStateException(e);
       }
 
       try {
-        return beamRelNode.buildBeamPipeline(input, getSqlEnv());
+        return beamRelNode.buildBeamPipeline(input, beamSqlEnv);
       } catch (Exception e) {
         throw new IllegalStateException(e);
       }
@@ -186,7 +172,7 @@ public class BeamSql {
         PCollection<BeamRecord> sourceStream = (PCollection<BeamRecord>) input.get(sourceTag);
         BeamRecordCoder sourceCoder = (BeamRecordCoder) sourceStream.getCoder();
 
-        getSqlEnv().registerTable(sourceTag.getId(),
+        beamSqlEnv.registerTable(sourceTag.getId(),
             new BeamPCollectionTable(sourceStream,
                 (BeamRecordSqlType) sourceCoder.getRecordType()));
       }
@@ -197,53 +183,47 @@ public class BeamSql {
    * A {@link PTransform} representing an execution plan for a SQL query referencing
    * a single table.
    */
-  @AutoValue
-  public abstract static class SimpleQueryTransform
+  public static class SimpleQueryTransform
       extends PTransform<PCollection<BeamRecord>, PCollection<BeamRecord>> {
     private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
-    abstract BeamSqlEnv getSqlEnv();
-    abstract String getSqlQuery();
+    private QueryTransform delegate;
 
-    static Builder builder() {
-      return new AutoValue_BeamSql_SimpleQueryTransform.Builder();
+    public SimpleQueryTransform(String sqlQuery) {
+      this.delegate = new QueryTransform(sqlQuery);
     }
 
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setSqlQuery(String sqlQuery);
-      abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
-      abstract SimpleQueryTransform build();
+    /**
+     * register a UDF function used in this query.
+     *
+     * <p>Refer to {@link BeamSqlUdf} for more about how to implement a UDAF in BeamSql.
+     */
+    public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+      delegate.withUdf(functionName, clazz);
+      return this;
     }
 
     /**
-     * register a UDF function used in this query.
+     * register {@link SerializableFunction} as a UDF function used in this query.
+     * Note, {@link SerializableFunction} must have a constructor without arguments.
      */
-     public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
-       getSqlEnv().registerUdf(functionName, clazz);
-       return this;
-     }
-     /**
-      * register {@link SerializableFunction} as a UDF function used in this query.
-      * Note, {@link SerializableFunction} must have a constructor without arguments.
-      */
-      public SimpleQueryTransform withUdf(String functionName, SerializableFunction sfn){
-        getSqlEnv().registerUdf(functionName, sfn);
-        return this;
-      }
+    public SimpleQueryTransform withUdf(String functionName, SerializableFunction sfn){
+      delegate.withUdf(functionName, sfn);
+      return this;
+    }
 
-      /**
-       * register a {@link CombineFn} as UDAF function used in this query.
-       */
-      public SimpleQueryTransform withUdaf(String functionName, CombineFn combineFn){
-        getSqlEnv().registerUdaf(functionName, combineFn);
-        return this;
-      }
+    /**
+     * register a {@link CombineFn} as UDAF function used in this query.
+     */
+    public SimpleQueryTransform withUdaf(String functionName, CombineFn combineFn){
+      delegate.withUdaf(functionName, combineFn);
+      return this;
+    }
 
     private void validateQuery() {
       SqlNode sqlNode;
       try {
-        sqlNode = getSqlEnv().planner.parseQuery(getSqlQuery());
-        getSqlEnv().planner.getPlanner().close();
+        sqlNode = delegate.beamSqlEnv.getPlanner().parseQuery(delegate.sqlQuery);
+        delegate.beamSqlEnv.getPlanner().getPlanner().close();
       } catch (SqlParseException e) {
         throw new IllegalStateException(e);
       }
@@ -264,10 +244,7 @@ public class BeamSql {
     public PCollection<BeamRecord> expand(PCollection<BeamRecord> input) {
       validateQuery();
       return PCollectionTuple.of(new TupleTag<BeamRecord>(PCOLLECTION_TABLE_NAME), input)
-          .apply(QueryTransform.builder()
-              .setSqlEnv(getSqlEnv())
-              .setSqlQuery(getSqlQuery())
-              .build());
+          .apply(delegate);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
deleted file mode 100644
index a43808e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptUtil;
-
-/**
- * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
- */
-@Experimental
-public class BeamSqlCli {
-  /**
-   * Returns a human readable representation of the query execution plan.
-   */
-  public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
-    BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString);
-    String beamPlan = RelOptUtil.toString(exeTree);
-    return beamPlan;
-  }
-
-  /**
-   * compile SQL, and return a {@link Pipeline}.
-   */
-  public static PCollection<BeamRecord> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
-      throws Exception{
-    PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
-        .as(PipelineOptions.class); // FlinkPipelineOptions.class
-    options.setJobName("BeamPlanCreator");
-    Pipeline pipeline = Pipeline.create(options);
-
-    return compilePipeline(sqlStatement, pipeline, sqlEnv);
-  }
-
-  /**
-   * compile SQL, and return a {@link Pipeline}.
-   */
-  public static PCollection<BeamRecord> compilePipeline(String sqlStatement, Pipeline basePipeline,
-      BeamSqlEnv sqlEnv) throws Exception{
-    PCollection<BeamRecord> resultStream =
-        sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
-    return resultStream;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
deleted file mode 100644
index 79f2b32..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.ScannableTable;
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.tools.Frameworks;
-
-/**
- * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}.
- *
- * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and
- * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
- */
-public class BeamSqlEnv implements Serializable{
-  transient SchemaPlus schema;
-  transient BeamQueryPlanner planner;
-
-  public BeamSqlEnv() {
-    schema = Frameworks.createRootSchema(true);
-    planner = new BeamQueryPlanner(schema);
-  }
-
-  /**
-   * Register a UDF function which can be used in SQL expression.
-   */
-  public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
-    schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD));
-  }
-
-  /**
-   * register {@link SerializableFunction} as a UDF function which can be used in SQL expression.
-   * Note, {@link SerializableFunction} must have a constructor without arguments.
-   */
-  public void registerUdf(String functionName, SerializableFunction sfn) {
-    schema.add(functionName, ScalarFunctionImpl.create(sfn.getClass(), "apply"));
-  }
-
-  /**
-   * Register a {@link CombineFn} as UDAF function which can be used in GROUP-BY expression.
-   */
-  public void registerUdaf(String functionName, CombineFn combineFn) {
-    schema.add(functionName, new UdafImpl(combineFn));
-  }
-
-  /**
-   * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
-   *
-   */
-  public void registerTable(String tableName, BaseBeamTable table) {
-    schema.add(tableName, new BeamCalciteTable(table.getRowType()));
-    planner.getSourceTables().put(tableName, table);
-  }
-
-  /**
-   * Find {@link BaseBeamTable} by table name.
-   */
-  public BaseBeamTable findTable(String tableName){
-    return planner.getSourceTables().get(tableName);
-  }
-
-  private static class BeamCalciteTable implements ScannableTable, Serializable {
-    private BeamRecordSqlType beamSqlRowType;
-    public BeamCalciteTable(BeamRecordSqlType beamSqlRowType) {
-      this.beamSqlRowType = beamSqlRowType;
-    }
-    @Override
-    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-      return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
-          .apply(BeamQueryPlanner.TYPE_FACTORY);
-    }
-
-    @Override
-    public Enumerable<Object[]> scan(DataContext root) {
-      // not used as Beam SQL uses its own execution engine
-      return null;
-    }
-
-    /**
-     * Not used {@link Statistic} to optimize the plan.
-     */
-    @Override
-    public Statistic getStatistic() {
-      return Statistics.UNKNOWN;
-    }
-
-    /**
-     * all sources are treated as TABLE in Beam SQL.
-     */
-    @Override
-    public Schema.TableType getJdbcTableType() {
-      return Schema.TableType.TABLE;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java
new file mode 100644
index 0000000..870165d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.values.BeamRecord;
+
+/**
+ * A {@link Coder} encodes {@link BeamRecord}.
+ */
+@Experimental
+public class BeamSqlRecordHelper {
+
+  public static BeamRecordSqlType getSqlRecordType(BeamRecord record) {
+    return (BeamRecordSqlType) record.getDataType();
+  }
+
+  /**
+   * {@link Coder} for Java type {@link Short}.
+   */
+  public static class ShortCoder extends CustomCoder<Short> {
+    private static final ShortCoder INSTANCE = new ShortCoder();
+
+    public static ShortCoder of() {
+      return INSTANCE;
+    }
+
+    private ShortCoder() {
+    }
+
+    @Override
+    public void encode(Short value, OutputStream outStream) throws CoderException, IOException {
+      new DataOutputStream(outStream).writeShort(value);
+    }
+
+    @Override
+    public Short decode(InputStream inStream) throws CoderException, IOException {
+      return new DataInputStream(inStream).readShort();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+  /**
+   * {@link Coder} for Java type {@link Float}, it's stored as {@link BigDecimal}.
+   */
+  public static class FloatCoder extends CustomCoder<Float> {
+    private static final FloatCoder INSTANCE = new FloatCoder();
+    private static final BigDecimalCoder CODER = BigDecimalCoder.of();
+
+    public static FloatCoder of() {
+      return INSTANCE;
+    }
+
+    private FloatCoder() {
+    }
+
+    @Override
+    public void encode(Float value, OutputStream outStream) throws CoderException, IOException {
+      CODER.encode(new BigDecimal(value), outStream);
+    }
+
+    @Override
+    public Float decode(InputStream inStream) throws CoderException, IOException {
+      return CODER.decode(inStream).floatValue();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+  /**
+   * {@link Coder} for Java type {@link Double}, it's stored as {@link BigDecimal}.
+   */
+  public static class DoubleCoder extends CustomCoder<Double> {
+    private static final DoubleCoder INSTANCE = new DoubleCoder();
+    private static final BigDecimalCoder CODER = BigDecimalCoder.of();
+
+    public static DoubleCoder of() {
+      return INSTANCE;
+    }
+
+    private DoubleCoder() {
+    }
+
+    @Override
+    public void encode(Double value, OutputStream outStream) throws CoderException, IOException {
+      CODER.encode(new BigDecimal(value), outStream);
+    }
+
+    @Override
+    public Double decode(InputStream inStream) throws CoderException, IOException {
+      return CODER.decode(inStream).doubleValue();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+
+  /**
+   * {@link Coder} for Java type {@link GregorianCalendar}, it's stored as {@link Long}.
+   */
+  public static class TimeCoder extends CustomCoder<GregorianCalendar> {
+    private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+    private static final TimeCoder INSTANCE = new TimeCoder();
+
+    public static TimeCoder of() {
+      return INSTANCE;
+    }
+
+    private TimeCoder() {
+    }
+
+    @Override
+    public void encode(GregorianCalendar value, OutputStream outStream)
+        throws CoderException, IOException {
+      longCoder.encode(value.getTime().getTime(), outStream);
+    }
+
+    @Override
+    public GregorianCalendar decode(InputStream inStream) throws CoderException, IOException {
+      GregorianCalendar calendar = new GregorianCalendar();
+      calendar.setTime(new Date(longCoder.decode(inStream)));
+      return calendar;
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+  /**
+   * {@link Coder} for Java type {@link Date}, it's stored as {@link Long}.
+   */
+  public static class DateCoder extends CustomCoder<Date> {
+    private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+    private static final DateCoder INSTANCE = new DateCoder();
+
+    public static DateCoder of() {
+      return INSTANCE;
+    }
+
+    private DateCoder() {
+    }
+
+    @Override
+    public void encode(Date value, OutputStream outStream) throws CoderException, IOException {
+      longCoder.encode(value.getTime(), outStream);
+    }
+
+    @Override
+    public Date decode(InputStream inStream) throws CoderException, IOException {
+      return new Date(longCoder.decode(inStream));
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+
+  /**
+   * {@link Coder} for Java type {@link Boolean}.
+   */
+  public static class BooleanCoder extends CustomCoder<Boolean> {
+    private static final BooleanCoder INSTANCE = new BooleanCoder();
+
+    public static BooleanCoder of() {
+      return INSTANCE;
+    }
+
+    private BooleanCoder() {
+    }
+
+    @Override
+    public void encode(Boolean value, OutputStream outStream) throws CoderException, IOException {
+      new DataOutputStream(outStream).writeBoolean(value);
+    }
+
+    @Override
+    public Boolean decode(InputStream inStream) throws CoderException, IOException {
+      return new DataInputStream(inStream).readBoolean();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
new file mode 100644
index 0000000..d4828e7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.io.Serializable;
+
+/**
+ * Interface to create a UDF in Beam SQL.
+ *
+ * <p>A static method {@code eval} is required. Here is an example:
+ *
+ * <blockquote><pre>
+ * public static class MyLeftFunction {
+ *   public String eval(
+ *       &#64;Parameter(name = "s") String s,
+ *       &#64;Parameter(name = "n", optional = true) Integer n) {
+ *     return s.substring(0, n == null ? 1 : n);
+ *   }
+ * }</pre></blockquote>
+ *
+ * <p>The first parameter is named "s" and is mandatory,
+ * and the second parameter is named "n" and is optional.
+ */
+public interface BeamSqlUdf extends Serializable {
+  String UDF_METHOD = "eval";
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 91251cf..0c5dae1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -21,8 +21,8 @@ import java.sql.Types;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.BeamSql;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java
new file mode 100644
index 0000000..5c7d920
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptUtil;
+
+/**
+ * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
+ */
+@Experimental
+public class BeamSqlCli {
+  /**
+   * Returns a human readable representation of the query execution plan.
+   */
+  public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
+    BeamRelNode exeTree = sqlEnv.getPlanner().convertToBeamRel(sqlString);
+    String beamPlan = RelOptUtil.toString(exeTree);
+    return beamPlan;
+  }
+
+  /**
+   * compile SQL, and return a {@link Pipeline}.
+   */
+  public static PCollection<BeamRecord> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
+      throws Exception{
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
+        .as(PipelineOptions.class);
+    options.setJobName("BeamPlanCreator");
+    Pipeline pipeline = Pipeline.create(options);
+
+    return compilePipeline(sqlStatement, pipeline, sqlEnv);
+  }
+
+  /**
+   * compile SQL, and return a {@link Pipeline}.
+   */
+  public static PCollection<BeamRecord> compilePipeline(String sqlStatement, Pipeline basePipeline,
+      BeamSqlEnv sqlEnv) throws Exception{
+    PCollection<BeamRecord> resultStream = sqlEnv.getPlanner()
+        .compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
+    return resultStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
new file mode 100644
index 0000000..fcc9079
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamSql;
+import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.tools.Frameworks;
+
+/**
+ * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and
+ * {@link BeamSqlCli}.
+ *
+ * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions,
+ * and a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
+ */
+public class BeamSqlEnv implements Serializable{
+  transient SchemaPlus schema;
+  transient BeamQueryPlanner planner;
+
+  public BeamSqlEnv() {
+    schema = Frameworks.createRootSchema(true);
+    planner = new BeamQueryPlanner(schema);
+  }
+
+  /**
+   * Register a UDF function which can be used in SQL expression.
+   */
+  public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+    schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD));
+  }
+
+  /**
+   * Register {@link SerializableFunction} as a UDF function which can be used in SQL expression.
+   * Note, {@link SerializableFunction} must have a constructor without arguments.
+   */
+  public void registerUdf(String functionName, SerializableFunction sfn) {
+    schema.add(functionName, ScalarFunctionImpl.create(sfn.getClass(), "apply"));
+  }
+
+  /**
+   * Register a UDAF function which can be used in GROUP-BY expression.
+   * See {@link org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF.
+   */
+  public void registerUdaf(String functionName, Combine.CombineFn combineFn) {
+    schema.add(functionName, new UdafImpl(combineFn));
+  }
+
+  /**
+   * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
+   *
+   */
+  public void registerTable(String tableName, BeamSqlTable table) {
+    schema.add(tableName, new BeamCalciteTable(table.getRowType()));
+    planner.getSourceTables().put(tableName, table);
+  }
+
+  /**
+   * Find {@link BaseBeamTable} by table name.
+   */
+  public BeamSqlTable findTable(String tableName){
+    return planner.getSourceTables().get(tableName);
+  }
+
+  private static class BeamCalciteTable implements ScannableTable, Serializable {
+    private BeamRecordSqlType beamSqlRowType;
+    public BeamCalciteTable(BeamRecordSqlType beamSqlRowType) {
+      this.beamSqlRowType = beamSqlRowType;
+    }
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
+          .apply(BeamQueryPlanner.TYPE_FACTORY);
+    }
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root) {
+      // not used as Beam SQL uses its own execution engine
+      return null;
+    }
+
+    /**
+     * Not used {@link Statistic} to optimize the plan.
+     */
+    @Override
+    public Statistic getStatistic() {
+      return Statistics.UNKNOWN;
+    }
+
+    /**
+     * all sources are treated as TABLE in Beam SQL.
+     */
+    @Override
+    public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.TABLE;
+    }
+  }
+
+  public BeamQueryPlanner getPlanner() {
+    return planner;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java
new file mode 100644
index 0000000..de237d6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation classes of BeamSql.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
index b421bc3..410c783 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -23,10 +23,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -67,7 +68,7 @@ public class BeamQueryPlanner {
   private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class);
 
   protected final Planner planner;
-  private Map<String, BaseBeamTable> sourceTables = new HashMap<>();
+  private Map<String, BeamSqlTable> sourceTables = new HashMap<>();
 
   public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
       RelDataTypeSystem.DEFAULT);
@@ -156,7 +157,7 @@ public class BeamQueryPlanner {
     return planner.validate(sqlNode);
   }
 
-  public Map<String, BaseBeamTable> getSourceTables() {
+  public Map<String, BeamSqlTable> getSourceTables() {
     return sourceTables;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 4b557f9..e49e79c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -21,10 +21,10 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.coders.BeamRecordCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
index 8fe5be4..9d36a47 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlFilterFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index 1e3eb4c..7bb08c2 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import com.google.common.base.Joiner;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -65,7 +65,7 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
 
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 
-    BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
+    BeamSqlTable targetTable = sqlEnv.findTable(sourceName);
 
     upstream.apply(stageName, targetTable.buildIOWriter());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 254f990..1e4f506 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -18,9 +18,9 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import com.google.common.base.Joiner;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -53,7 +53,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
       return sourceStream;
     } else {
       //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
-      BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
+      BeamSqlTable sourceTable = sqlEnv.findTable(sourceName);
       return sourceTable.buildIOReader(inputPCollections.getPipeline())
           .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
index 5919329..1ffb636 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -19,7 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 5ac9575..cc26aa6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -25,10 +25,10 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
index b55252a..6f5dff2 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -19,7 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
index b1ff629..501feb3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlProjectFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index b8b4293..9e8d46d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -33,6 +33,7 @@ public interface BeamRelNode extends RelNode {
    * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
    * algorithm.
    */
-  PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
+  PCollection<BeamRecord> buildBeamPipeline(
+      PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
       throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
index f9cbf4f..a1f3e2b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.io.Serializable;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 80f3c97..d658638 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -25,9 +25,9 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.ParDo;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index 63ebdf3..85d676e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -19,7 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index c4caff3..d684294 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -21,10 +21,10 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java
new file mode 100644
index 0000000..73e0863
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+
+/**
+ * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
+ */
+public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
+  protected BeamRecordSqlType beamSqlRowType;
+  public BaseBeamTable(BeamRecordSqlType beamSqlRowType) {
+    this.beamSqlRowType = beamSqlRowType;
+  }
+
+  @Override public BeamRecordSqlType getRowType() {
+    return beamSqlRowType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java
new file mode 100644
index 0000000..5ced467
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import java.io.Serializable;
+
+/**
+ * Type as a source IO, determined whether it's a STREAMING process, or batch
+ * process.
+ */
+public enum BeamIOType implements Serializable {
+  BOUNDED, UNBOUNDED;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
new file mode 100644
index 0000000..31e60e0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table,
+ * then a downstream query can query directly.
+ */
+public class BeamPCollectionTable extends BaseBeamTable {
+  private BeamIOType ioType;
+  private transient PCollection<BeamRecord> upstream;
+
+  protected BeamPCollectionTable(BeamRecordSqlType beamSqlRowType) {
+    super(beamSqlRowType);
+  }
+
+  public BeamPCollectionTable(PCollection<BeamRecord> upstream,
+      BeamRecordSqlType beamSqlRowType){
+    this(beamSqlRowType);
+    ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
+        ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
+    this.upstream = upstream;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return ioType;
+  }
+
+  @Override
+  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
+    return upstream;
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
+    throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java
new file mode 100644
index 0000000..46fba59
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * This interface defines a Beam Sql Table.
+ */
+public interface BeamSqlTable {
+  /**
+   * In Beam SQL, there's no difference between a batch query and a streaming
+   * query. {@link BeamIOType} is used to validate the sources.
+   */
+  BeamIOType getSourceType();
+
+  /**
+   * create a {@code PCollection<BeamSqlRow>} from source.
+   *
+   */
+  PCollection<BeamRecord> buildIOReader(Pipeline pipeline);
+
+  /**
+   * create a {@code IO.write()} instance to write to target.
+   *
+   */
+   PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter();
+
+  /**
+   * Get the schema info of the table.
+   */
+   BeamRecordSqlType getRowType();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
new file mode 100644
index 0000000..6f7f09b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * Utility methods for working with {@code BeamTable}.
+ */
+public final class BeamTableUtils {
+  public static BeamRecord csvLine2BeamSqlRow(
+      CSVFormat csvFormat,
+      String line,
+      BeamRecordSqlType beamRecordSqlType) {
+    List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.getFieldCount());
+    try (StringReader reader = new StringReader(line)) {
+      CSVParser parser = csvFormat.parse(reader);
+      CSVRecord rawRecord = parser.getRecords().get(0);
+
+      if (rawRecord.size() != beamRecordSqlType.getFieldCount()) {
+        throw new IllegalArgumentException(String.format(
+            "Expect %d fields, but actually %d",
+            beamRecordSqlType.getFieldCount(), rawRecord.size()
+        ));
+      } else {
+        for (int idx = 0; idx < beamRecordSqlType.getFieldCount(); idx++) {
+          String raw = rawRecord.get(idx);
+          fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw));
+        }
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("decodeRecord failed!", e);
+    }
+    return new BeamRecord(beamRecordSqlType, fieldsValue);
+  }
+
+  public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
+    StringWriter writer = new StringWriter();
+    try (CSVPrinter printer = csvFormat.print(writer)) {
+      for (int i = 0; i < row.getFieldCount(); i++) {
+        printer.print(row.getFieldValue(i).toString());
+      }
+      printer.println();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("encodeRecord failed!", e);
+    }
+    return writer.toString();
+  }
+
+  public static Object autoCastField(int fieldType, Object rawObj) {
+    if (rawObj == null) {
+      return null;
+    }
+
+    SqlTypeName columnType = CalciteUtils.toCalciteType(fieldType);
+    // auto-casting for numberics
+    if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
+        || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
+      String raw = rawObj.toString();
+      switch (columnType) {
+        case TINYINT:
+          return Byte.valueOf(raw);
+        case SMALLINT:
+          return Short.valueOf(raw);
+        case INTEGER:
+          return Integer.valueOf(raw);
+        case BIGINT:
+          return Long.valueOf(raw);
+        case FLOAT:
+          return Float.valueOf(raw);
+        case DOUBLE:
+          return Double.valueOf(raw);
+        default:
+          throw new UnsupportedOperationException(
+              String.format("Column type %s is not supported yet!", columnType));
+      }
+    } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
+      // convert NlsString to String
+      if (rawObj instanceof NlsString) {
+        return ((NlsString) rawObj).getValue();
+      } else {
+        return rawObj;
+      }
+    } else {
+      return rawObj;
+    }
+  }
+}