You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/06/16 00:43:10 UTC

[1/2] beam git commit: [BEAM-2442] BeamSql surface api test.

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 2e9253401 -> df5859d44


[BEAM-2442] BeamSql surface api test.

The surface api of BeamSql includes the following:
- BeamSql
- BeamSqlCli
- BeamSqlEnv
- All the classes in package org.apache.beam.dsls.sql.schema

Calcite related methods are encapsulated into CalciteUtils(which is not part of surface api) to avoid exposure.

Created a new BeamSqlTable interface which abstracts the beam table concept.

RelDataType, RelProtoDataType are all removed from surface api, BeamSqlRecordType is the only class which represents the schema of a table.
java.sql.Types is used to represent sql type instead of Calcite SqlTypeName.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7bcbad53
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7bcbad53
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7bcbad53

Branch: refs/heads/DSL_SQL
Commit: 7bcbad53ebe6bbad6c1dd5b8f2b6ff8f9fbf6d26
Parents: 2e92534
Author: James Xu <xu...@gmail.com>
Authored: Wed Jun 14 23:47:10 2017 +0800
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Jun 15 17:39:48 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/dsls/sql/BeamSql.java  |  19 ++--
 .../org/apache/beam/dsls/sql/BeamSqlCli.java    |  13 +--
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java    |  52 ++++++++-
 .../beam/dsls/sql/example/BeamSqlExample.java   |   8 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  11 +-
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |   4 +-
 .../beam/dsls/sql/rel/BeamProjectRel.java       |   7 +-
 .../apache/beam/dsls/sql/rel/BeamSortRel.java   |   6 +-
 .../apache/beam/dsls/sql/rel/BeamValuesRel.java |   3 +-
 .../beam/dsls/sql/schema/BaseBeamTable.java     |  70 +-----------
 .../dsls/sql/schema/BeamPCollectionTable.java   |  10 +-
 .../beam/dsls/sql/schema/BeamSqlRecordType.java |  42 +-------
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java |   5 +-
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |   6 +-
 .../beam/dsls/sql/schema/BeamSqlTable.java      |  52 +++++++++
 .../beam/dsls/sql/schema/BeamTableUtils.java    |   3 +-
 .../sql/schema/kafka/BeamKafkaCSVTable.java     |   9 +-
 .../dsls/sql/schema/kafka/BeamKafkaTable.java   |  12 ++-
 .../beam/dsls/sql/schema/package-info.java      |   1 -
 .../dsls/sql/schema/text/BeamTextCSVTable.java  |  10 +-
 .../dsls/sql/schema/text/BeamTextTable.java     |   9 +-
 .../transform/BeamAggregationTransforms.java    |   8 +-
 .../beam/dsls/sql/utils/CalciteUtils.java       | 108 +++++++++++++++++++
 .../beam/dsls/sql/utils/package-info.java       |  22 ++++
 .../beam/dsls/sql/BeamSqlApiSurfaceTest.java    |  59 ++++++++++
 .../interpreter/BeamSqlFnExecutorTestBase.java  |   3 +-
 .../beam/dsls/sql/planner/BasePlanner.java      |  22 ++--
 .../BeamPlannerAggregationSubmitTest.java       |  12 ++-
 .../dsls/sql/planner/MockedBeamSqlTable.java    |  11 +-
 .../sql/schema/BeamPCollectionTableTest.java    |   7 +-
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |   3 +-
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |  19 ++--
 .../sql/schema/text/BeamTextCSVTableTest.java   |  12 ++-
 .../transform/BeamAggregationTransformTest.java |   4 +-
 .../schema/transform/BeamTransformBaseTest.java |   3 +-
 35 files changed, 433 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index 0d6454b..04fe055 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.dsls.sql;
 
+import static org.apache.beam.dsls.sql.BeamSqlEnv.planner;
+import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable;
+
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -71,6 +74,7 @@ p.run().waitUntilFinish();
  */
 @Experimental
 public class BeamSql {
+
   /**
    * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
    *
@@ -101,7 +105,8 @@ public class BeamSql {
   /**
    * A {@link PTransform} representing an execution plan for a SQL query.
    */
-  public static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
+  private static class QueryTransform extends
+      PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
     private String sqlQuery;
     public QueryTransform(String sqlQuery) {
       this.sqlQuery = sqlQuery;
@@ -114,13 +119,13 @@ public class BeamSql {
         PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
         BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
 
-        BeamSqlEnv.registerTable(sourceTag.getId(),
-            new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema().toRelDataType()));
+        registerTable(sourceTag.getId(),
+            new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
       }
 
       BeamRelNode beamRelNode = null;
       try {
-        beamRelNode = BeamSqlEnv.planner.convertToBeamRel(sqlQuery);
+        beamRelNode = planner.convertToBeamRel(sqlQuery);
       } catch (ValidationException | RelConversionException | SqlParseException e) {
         throw new IllegalStateException(e);
       }
@@ -137,7 +142,7 @@ public class BeamSql {
    * A {@link PTransform} representing an execution plan for a SQL query referencing
    * a single table.
    */
-  public static class SimpleQueryTransform
+  private static class SimpleQueryTransform
       extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
     private String sqlQuery;
     public SimpleQueryTransform(String sqlQuery) {
@@ -152,8 +157,8 @@ public class BeamSql {
     public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
       SqlNode sqlNode;
       try {
-        sqlNode = BeamSqlEnv.planner.parseQuery(sqlQuery);
-        BeamSqlEnv.planner.getPlanner().close();
+        sqlNode = planner.parseQuery(sqlQuery);
+        planner.getPlanner().close();
       } catch (SqlParseException e) {
         throw new IllegalStateException(e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
index a55f655..dbf9a59 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.dsls.sql;
 
+import static org.apache.beam.dsls.sql.BeamSqlEnv.planner;
+
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.Pipeline;
@@ -25,9 +27,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
 
 /**
  * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
@@ -38,9 +37,8 @@ public class BeamSqlCli {
   /**
    * Returns a human readable representation of the query execution plan.
    */
-  public static String explainQuery(String sqlString)
-      throws ValidationException, RelConversionException, SqlParseException {
-    BeamRelNode exeTree = BeamSqlEnv.planner.convertToBeamRel(sqlString);
+  public static String explainQuery(String sqlString) throws Exception {
+    BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
     String beamPlan = RelOptUtil.toString(exeTree);
     return beamPlan;
   }
@@ -63,8 +61,7 @@ public class BeamSqlCli {
   public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline)
       throws Exception{
     PCollection<BeamSqlRow> resultStream =
-        BeamSqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline);
+        planner.compileBeamPipeline(sqlStatement, basePipeline);
     return resultStream;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
index af6c007..d7715c7 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -17,9 +17,21 @@
  */
 package org.apache.beam.dsls.sql;
 
+import java.io.Serializable;
+
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
 import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.tools.Frameworks;
 
@@ -30,8 +42,8 @@ import org.apache.calcite.tools.Frameworks;
  * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
  */
 public class BeamSqlEnv {
-  public static SchemaPlus schema;
-  public static BeamQueryPlanner planner;
+  static SchemaPlus schema;
+  static BeamQueryPlanner planner;
 
   static {
     schema = Frameworks.createRootSchema(true);
@@ -50,7 +62,7 @@ public class BeamSqlEnv {
    *
    */
   public static void registerTable(String tableName, BaseBeamTable table) {
-    schema.add(tableName, table);
+    schema.add(tableName, new BeamCalciteTable(table.getRecordType()));
     planner.getSourceTables().put(tableName, table);
   }
 
@@ -60,4 +72,38 @@ public class BeamSqlEnv {
   public static BaseBeamTable findTable(String tableName){
     return planner.getSourceTables().get(tableName);
   }
+
+  private static class BeamCalciteTable implements ScannableTable, Serializable {
+    private BeamSqlRecordType beamSqlRecordType;
+    public BeamCalciteTable(BeamSqlRecordType beamSqlRecordType) {
+      this.beamSqlRecordType = beamSqlRecordType;
+    }
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return CalciteUtils.toCalciteRecordType(this.beamSqlRecordType)
+          .apply(BeamQueryPlanner.TYPE_FACTORY);
+    }
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root) {
+      // not used as Beam SQL uses its own execution engine
+      return null;
+    }
+
+    /**
+     * Not used {@link Statistic} to optimize the plan.
+     */
+    @Override
+    public Statistic getStatistic() {
+      return Statistics.UNKNOWN;
+    }
+
+    /**
+     * all sources are treated as TABLE in Beam SQL.
+     */
+    @Override
+    public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.TABLE;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 36e1aa9..8ba785b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.dsls.sql.example;
 
+import java.sql.Types;
 import org.apache.beam.dsls.sql.BeamSql;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -31,7 +32,6 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,9 +48,9 @@ public class BeamSqlExample {
 
     //define the input row format
     BeamSqlRecordType type = new BeamSqlRecordType();
-    type.addField("c1", SqlTypeName.INTEGER);
-    type.addField("c2", SqlTypeName.VARCHAR);
-    type.addField("c3", SqlTypeName.DOUBLE);
+    type.addField("c1", Types.INTEGER);
+    type.addField("c2", Types.VARCHAR);
+    type.addField("c3", Types.DOUBLE);
     BeamSqlRow row = new BeamSqlRow(type);
     row.addField(0, 1);
     row.addField(1, "row");

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 9951536..828dcec 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -24,6 +24,7 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.Combine;
@@ -109,13 +110,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
         stageName + "_aggregation",
         Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues(
             new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(),
-                BeamSqlRecordType.from(input.getRowType()))))
+                CalciteUtils.toBeamRecordType(input.getRowType()))))
         .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder));
 
     PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "_mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
-            BeamSqlRecordType.from(getRowType()), getAggCallList())));
-    mergedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType())));
+            CalciteUtils.toBeamRecordType(getRowType()), getAggCallList())));
+    mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
 
     return mergedStream;
   }
@@ -124,7 +125,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
    * Type of sub-rowrecord used as Group-By keys.
    */
   private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
-    BeamSqlRecordType inputRecordType = BeamSqlRecordType.from(relDataType);
+    BeamSqlRecordType inputRecordType = CalciteUtils.toBeamRecordType(relDataType);
     BeamSqlRecordType typeOfKey = new BeamSqlRecordType();
     for (int i : groupSet.asList()) {
       if (i != windowFieldIdx) {
@@ -141,7 +142,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
   private BeamSqlRecordType exAggFieldsSchema() {
     BeamSqlRecordType typeOfAggFields = new BeamSqlRecordType();
     for (AggregateCall ac : getAggCallList()) {
-      typeOfAggFields.addField(ac.name, ac.type.getSqlTypeName());
+      typeOfAggFields.addField(ac.name, CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
     }
     return typeOfAggFields;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 4c5e113..dc13646 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -20,10 +20,10 @@ package org.apache.beam.dsls.sql.rel;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
 import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamSqlFilterFn;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -63,7 +63,7 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
 
     PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
         ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
-    filterStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType())));
+    filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
 
     return filterStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index 9b7492b..937a834 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -22,10 +22,10 @@ import java.util.List;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
 import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamSqlProjectFn;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -72,8 +72,9 @@ public class BeamProjectRel extends Project implements BeamRelNode {
     BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
 
     PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
-        .of(new BeamSqlProjectFn(getRelTypeName(), executor, BeamSqlRecordType.from(rowType))));
-    projectStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType())));
+        .of(new BeamSqlProjectFn(getRelTypeName(), executor,
+            CalciteUtils.toBeamRecordType(rowType))));
+    projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
 
     return projectStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
index ff8bbcf..7632e6a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -25,9 +25,9 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -149,7 +149,7 @@ public class BeamSortRel extends Sort implements BeamRelNode {
 
     PCollection<BeamSqlRow> orderedStream = rawStream.apply(
         "flatten", Flatten.<BeamSqlRow>iterables());
-    orderedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType())));
+    orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
 
     return orderedStream;
   }
@@ -191,7 +191,7 @@ public class BeamSortRel extends Sort implements BeamRelNode {
       for (int i = 0; i < fieldsIndices.size(); i++) {
         int fieldIndex = fieldsIndices.get(i);
         int fieldRet = 0;
-        SqlTypeName fieldType = row1.getDataType().getFieldsType().get(fieldIndex);
+        SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex);
         // whether NULL should be ordered first or last(compared to non-null values) depends on
         // what user specified in SQL(NULLS FIRST/NULLS LAST)
         if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
index 9a1887f..61d9713 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -28,6 +28,7 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.schema.BeamTableUtils;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -65,7 +66,7 @@ public class BeamValuesRel extends Values implements BeamRelNode {
       throw new IllegalStateException("Values with empty tuples!");
     }
 
-    BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.from(this.getRowType());
+    BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(this.getRowType());
     for (ImmutableList<RexLiteral> tuple : tuples) {
       BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
       for (int i = 0; i < tuple.size(); i++) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
index 333bb10..6d49bcc 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
@@ -18,77 +18,17 @@
 package org.apache.beam.dsls.sql.schema;
 
 import java.io.Serializable;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.ScannableTable;
-import org.apache.calcite.schema.Schema.TableType;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
 
 /**
  * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
  */
-public abstract class BaseBeamTable implements ScannableTable, Serializable {
-  private RelDataType relDataType;
-
+public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
   protected BeamSqlRecordType beamSqlRecordType;
-
-  public BaseBeamTable(RelProtoDataType protoRowType) {
-    this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY);
-    this.beamSqlRecordType = BeamSqlRecordType.from(relDataType);
-  }
-
-  /**
-   * In Beam SQL, there's no difference between a batch query and a streaming
-   * query. {@link BeamIOType} is used to validate the sources.
-   */
-  public abstract BeamIOType getSourceType();
-
-  /**
-   * create a {@code PCollection<BeamSqlRow>} from source.
-   *
-   */
-  public abstract PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline);
-
-  /**
-   * create a {@code IO.write()} instance to write to target.
-   *
-   */
-  public abstract PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter();
-
-  @Override
-  public Enumerable<Object[]> scan(DataContext root) {
-    // not used as Beam SQL uses its own execution engine
-    return null;
-  }
-
-  @Override
-  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-    return relDataType;
+  public BaseBeamTable(BeamSqlRecordType beamSqlRecordType) {
+    this.beamSqlRecordType = beamSqlRecordType;
   }
 
-  /**
-   * Not used {@link Statistic} to optimize the plan.
-   */
-  @Override
-  public Statistic getStatistic() {
-    return Statistics.UNKNOWN;
+  @Override public BeamSqlRecordType getRecordType() {
+    return beamSqlRecordType;
   }
-
-  /**
-   * all sources are treated as TABLE in Beam SQL.
-   */
-  @Override
-  public TableType getJdbcTableType() {
-    return TableType.TABLE;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
index f679ed7..ecd0d67 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
@@ -22,7 +22,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.rel.type.RelProtoDataType;
 
 /**
  * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table,
@@ -32,12 +31,13 @@ public class BeamPCollectionTable extends BaseBeamTable {
   private BeamIOType ioType;
   private PCollection<BeamSqlRow> upstream;
 
-  protected BeamPCollectionTable(RelProtoDataType protoRowType) {
-    super(protoRowType);
+  protected BeamPCollectionTable(BeamSqlRecordType beamSqlRecordType) {
+    super(beamSqlRecordType);
   }
 
-  public BeamPCollectionTable(PCollection<BeamSqlRow> upstream, RelProtoDataType protoRowType){
-    this(protoRowType);
+  public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
+      BeamSqlRecordType beamSqlRecordType){
+    this(beamSqlRecordType);
     ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
         ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
     this.upstream = upstream;

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
index 7da08cc..08ba39f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
@@ -20,12 +20,6 @@ package org.apache.beam.dsls.sql.schema;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
  * Field type information in {@link BeamSqlRow}.
@@ -33,41 +27,13 @@ import org.apache.calcite.sql.type.SqlTypeName;
  */
 public class BeamSqlRecordType implements Serializable {
   private List<String> fieldsName = new ArrayList<>();
-  private List<SqlTypeName> fieldsType = new ArrayList<>();
+  private List<Integer> fieldsType = new ArrayList<>();
 
-  /**
-   * Generate from {@link RelDataType} which is used to create table.
-   */
-  public static BeamSqlRecordType from(RelDataType tableInfo) {
-    BeamSqlRecordType record = new BeamSqlRecordType();
-    for (RelDataTypeField f : tableInfo.getFieldList()) {
-      record.fieldsName.add(f.getName());
-      record.fieldsType.add(f.getType().getSqlTypeName());
-    }
-    return record;
-  }
-
-  public void addField(String fieldName, SqlTypeName fieldType) {
+  public void addField(String fieldName, Integer fieldType) {
     fieldsName.add(fieldName);
     fieldsType.add(fieldType);
   }
 
-  /**
-   * Create an instance of {@link RelDataType} so it can be used to create a table.
-   */
-  public RelProtoDataType toRelDataType() {
-    return new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a) {
-        FieldInfoBuilder builder = a.builder();
-        for (int idx = 0; idx < fieldsName.size(); ++idx) {
-          builder.add(fieldsName.get(idx), fieldsType.get(idx));
-        }
-        return builder.build();
-      }
-    };
-  }
-
   public int size() {
     return fieldsName.size();
   }
@@ -80,11 +46,11 @@ public class BeamSqlRecordType implements Serializable {
     this.fieldsName = fieldsName;
   }
 
-  public List<SqlTypeName> getFieldsType() {
+  public List<Integer> getFieldsType() {
     return fieldsType;
   }
 
-  public void setFieldsType(List<SqlTypeName> fieldsType) {
+  public void setFieldsType(List<Integer> fieldsType) {
     this.fieldsType = fieldsType;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index eb311cf..3a67303 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -81,7 +82,7 @@ public class BeamSqlRow implements Serializable {
       }
     }
 
-    SqlTypeName fieldType = dataType.getFieldsType().get(index);
+    SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
     switch (fieldType) {
       case INTEGER:
         if (!(fieldValue instanceof Integer)) {
@@ -201,7 +202,7 @@ public class BeamSqlRow implements Serializable {
     }
 
     Object fieldValue = dataValues.get(fieldIdx);
-    SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx);
+    SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, fieldIdx);
 
     switch (fieldType) {
       case INTEGER:

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index bcbd481..e86fb3f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -23,6 +23,8 @@ import java.io.OutputStream;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
+
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.BigDecimalCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
@@ -62,7 +64,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
         continue;
       }
 
-      switch (value.getDataType().getFieldsType().get(idx)) {
+      switch (CalciteUtils.getFieldType(value.getDataType(), idx)) {
         case INTEGER:
           intCoder.encode(value.getInteger(idx), outStream);
           break;
@@ -117,7 +119,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
         continue;
       }
 
-      switch (tableSchema.getFieldsType().get(idx)) {
+      switch (CalciteUtils.getFieldType(tableSchema, idx)) {
         case INTEGER:
           record.addField(idx, intCoder.decode(inStream));
           break;

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
new file mode 100644
index 0000000..986decb
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * This interface defines a Beam Sql Table.
+ */
+public interface BeamSqlTable {
+  /**
+   * In Beam SQL, there's no difference between a batch query and a streaming
+   * query. {@link BeamIOType} is used to validate the sources.
+   */
+  BeamIOType getSourceType();
+
+  /**
+   * create a {@code PCollection<BeamSqlRow>} from source.
+   *
+   */
+  PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline);
+
+  /**
+   * create a {@code IO.write()} instance to write to target.
+   *
+   */
+   PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter();
+
+  /**
+   * Get the schema info of the table.
+   */
+   BeamSqlRecordType getRecordType();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
index 1c1db91..79a9cb2 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.math.BigDecimal;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.NlsString;
 import org.apache.commons.csv.CSVFormat;
@@ -78,7 +79,7 @@ public final class BeamTableUtils {
       return;
     }
 
-    SqlTypeName columnType = row.getDataType().getFieldsType().get(idx);
+    SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx);
     // auto-casting for numberics
     if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
         || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
index f8c2553..39cf8d8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.commons.csv.CSVFormat;
 
 /**
@@ -38,14 +37,14 @@ import org.apache.commons.csv.CSVFormat;
  */
 public class BeamKafkaCSVTable extends BeamKafkaTable {
   private CSVFormat csvFormat;
-  public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers,
+  public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers,
       List<String> topics) {
-    this(protoRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
+    this(beamSqlRecordType, bootstrapServers, topics, CSVFormat.DEFAULT);
   }
 
-  public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers,
+  public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers,
       List<String> topics, CSVFormat format) {
-    super(protoRowType, bootstrapServers, topics);
+    super(beamSqlRecordType, bootstrapServers, topics);
     this.csvFormat = format;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
index c43fa2c..f27014e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -18,11 +18,14 @@
 package org.apache.beam.dsls.sql.schema.kafka;
 
 import static com.google.common.base.Preconditions.checkArgument;
+
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -32,7 +35,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 
@@ -47,13 +49,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
   private List<String> topics;
   private Map<String, Object> configUpdates;
 
-  protected BeamKafkaTable(RelProtoDataType protoRowType) {
-    super(protoRowType);
+  protected BeamKafkaTable(BeamSqlRecordType beamSqlRecordType) {
+    super(beamSqlRecordType);
   }
 
-  public BeamKafkaTable(RelProtoDataType protoRowType, String bootstrapServers,
+  public BeamKafkaTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers,
       List<String> topics) {
-    super(protoRowType);
+    super(beamSqlRecordType);
     this.bootstrapServers = bootstrapServers;
     this.topics = topics;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java
index 47de06f..4c41826 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 /**
  * define table schema, to map with Beam IO components.
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
index e575eee..41a786f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.dsls.sql.schema.text;
 
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
@@ -25,7 +26,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.commons.csv.CSVFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,13 +46,13 @@ public class BeamTextCSVTable extends BeamTextTable {
   /**
    * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
    */
-  public BeamTextCSVTable(RelProtoDataType protoDataType, String filePattern)  {
-    this(protoDataType, filePattern, CSVFormat.DEFAULT);
+  public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String filePattern)  {
+    this(beamSqlRecordType, filePattern, CSVFormat.DEFAULT);
   }
 
-  public BeamTextCSVTable(RelProtoDataType protoDataType, String filePattern,
+  public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String filePattern,
       CSVFormat csvFormat) {
-    super(protoDataType, filePattern);
+    super(beamSqlRecordType, filePattern);
     this.csvFormat = csvFormat;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
index 3353761..525c210 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
@@ -22,19 +22,16 @@ import java.io.Serializable;
 
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 
 /**
  * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
  */
 public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
   protected String filePattern;
-  protected BeamTextTable(RelProtoDataType protoRowType) {
-    super(protoRowType);
-  }
 
-  protected BeamTextTable(RelProtoDataType protoDataType, String filePattern) {
-    super(protoDataType);
+  protected BeamTextTable(BeamSqlRecordType beamSqlRecordType, String filePattern) {
+    super(beamSqlRecordType);
     this.filePattern = filePattern;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
index 51d3e89..e804b94 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
@@ -27,6 +27,7 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -163,7 +164,7 @@ public class BeamAggregationTransforms implements Serializable{
         //verify it's supported.
         verifySupportedAggregation(ac);
 
-        aggDataType.addField(ac.name, ac.type.getSqlTypeName());
+        aggDataType.addField(ac.name, CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
 
         SqlAggFunction aggFn = ac.getAggregation();
         switch (aggFn.getName()) {
@@ -178,7 +179,7 @@ public class BeamAggregationTransforms implements Serializable{
         case "AVG":
           int refIndex = ac.getArgList().get(0);
           aggElementExpressions.add(new BeamSqlInputRefExpression(
-              sourceRowRecordType.getFieldsType().get(refIndex), refIndex));
+              CalciteUtils.getFieldType(sourceRowRecordType, refIndex), refIndex));
           if ("AVG".equals(aggFn.getName())) {
             hasAvg = true;
           }
@@ -191,7 +192,8 @@ public class BeamAggregationTransforms implements Serializable{
       }
       // add a COUNT holder if only have AVG
       if (hasAvg && !hasCount) {
-        aggDataType.addField("__COUNT", SqlTypeName.BIGINT);
+        aggDataType.addField("__COUNT",
+            CalciteUtils.toJavaType(SqlTypeName.BIGINT));
 
         aggFunctions.add("COUNT");
         aggElementExpressions.add(BeamSqlPrimitive.<Long>of(SqlTypeName.BIGINT, 1L));

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
new file mode 100644
index 0000000..46b4911
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.utils;
+
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utility methods for Calcite related operations.
+ */
+public class CalciteUtils {
+  private static final Map<Integer, SqlTypeName> JAVA_TO_CALCITE_MAPPING = new HashMap<>();
+  private static final Map<SqlTypeName, Integer> CALCITE_TO_JAVA_MAPPING = new HashMap<>();
+  static {
+    JAVA_TO_CALCITE_MAPPING.put(Types.TINYINT, SqlTypeName.TINYINT);
+    JAVA_TO_CALCITE_MAPPING.put(Types.SMALLINT, SqlTypeName.SMALLINT);
+    JAVA_TO_CALCITE_MAPPING.put(Types.INTEGER, SqlTypeName.INTEGER);
+    JAVA_TO_CALCITE_MAPPING.put(Types.BIGINT, SqlTypeName.BIGINT);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.FLOAT, SqlTypeName.FLOAT);
+    JAVA_TO_CALCITE_MAPPING.put(Types.DOUBLE, SqlTypeName.DOUBLE);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.DECIMAL, SqlTypeName.DECIMAL);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR);
+    JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME);
+    JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP);
+
+    for (Map.Entry<Integer, SqlTypeName> pair : JAVA_TO_CALCITE_MAPPING.entrySet()) {
+      CALCITE_TO_JAVA_MAPPING.put(pair.getValue(), pair.getKey());
+    }
+  }
+
+  /**
+   * Get the corresponding {@code SqlTypeName} for an integer sql type.
+   */
+  public static SqlTypeName toCalciteType(int type) {
+    return JAVA_TO_CALCITE_MAPPING.get(type);
+  }
+
+  /**
+   * Get the integer sql type from Calcite {@code SqlTypeName}.
+   */
+  public static Integer toJavaType(SqlTypeName typeName) {
+    return CALCITE_TO_JAVA_MAPPING.get(typeName);
+  }
+
+  /**
+   * Get the {@code SqlTypeName} for the specified column of a table.
+   * @return
+   */
+  public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) {
+    return toCalciteType(schema.getFieldsType().get(index));
+  }
+
+  /**
+   * Generate {@code BeamSqlRecordType} from {@code RelDataType} which is used to create table.
+   */
+  public static BeamSqlRecordType toBeamRecordType(RelDataType tableInfo) {
+    BeamSqlRecordType record = new BeamSqlRecordType();
+    for (RelDataTypeField f : tableInfo.getFieldList()) {
+      record.getFieldsName().add(f.getName());
+      record.getFieldsType().add(toJavaType(f.getType().getSqlTypeName()));
+    }
+    return record;
+  }
+
+  /**
+   * Create an instance of {@code RelDataType} so it can be used to create a table.
+   */
+  public static RelProtoDataType toCalciteRecordType(final BeamSqlRecordType that) {
+    return new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a) {
+        RelDataTypeFactory.FieldInfoBuilder builder = a.builder();
+        for (int idx = 0; idx < that.getFieldsName().size(); ++idx) {
+          builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx)));
+        }
+        return builder.build();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java
new file mode 100644
index 0000000..b5c861a
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility classes.
+ */
+package org.apache.beam.dsls.sql.utils;

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java
new file mode 100644
index 0000000..922931c
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql;
+
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.beam.sdk.util.ApiSurface;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Surface test for BeamSql api.
+ */
+@RunWith(JUnit4.class)
+public class BeamSqlApiSurfaceTest {
+  @Test
+  public void testSdkApiSurface() throws Exception {
+
+    @SuppressWarnings("unchecked")
+    final Set<String> allowed =
+        ImmutableSet.of(
+            "org.apache.beam",
+            "org.joda.time",
+            "org.apache.commons.csv");
+
+    ApiSurface surface = ApiSurface
+        .ofClass(BeamSqlCli.class)
+        .includingClass(BeamSql.class)
+        .includingClass(BeamSqlEnv.class)
+        .includingPackage("org.apache.beam.dsls.sql.schema",
+            getClass().getClassLoader())
+        .pruningPrefix("java")
+        .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*Test")
+        .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*TestBase");
+
+    assertThat(surface, containsOnlyPackages(allowed));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
index d83ca8f..739d548 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
@@ -25,6 +25,7 @@ import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem;
 import org.apache.beam.dsls.sql.planner.BeamRuleSets;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@@ -69,7 +70,7 @@ public class BeamSqlFnExecutorTestBase {
         .add("price", SqlTypeName.DOUBLE)
         .add("order_time", SqlTypeName.BIGINT).build();
 
-    beamRecordType = BeamSqlRecordType.from(relDataType);
+    beamRecordType = CalciteUtils.toBeamRecordType(relDataType);
     record = new BeamSqlRow(beamRecordType);
 
     record.addField(0, 1234567L);

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
index 7f69345..2c5b555 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -17,15 +17,18 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
+import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable;
+
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
+
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
@@ -40,9 +43,9 @@ import org.junit.BeforeClass;
 public class BasePlanner {
   @BeforeClass
   public static void prepareClass() {
-    BeamSqlEnv.registerTable("ORDER_DETAILS", getTable());
-    BeamSqlEnv.registerTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
-    BeamSqlEnv.registerTable("SUB_ORDER_RAM", getTable());
+    registerTable("ORDER_DETAILS", getTable());
+    registerTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+    registerTable("SUB_ORDER_RAM", getTable());
   }
 
   private static BaseBeamTable getTable() {
@@ -54,8 +57,8 @@ public class BasePlanner {
       }
     };
 
-    BeamSqlRecordType dataType = BeamSqlRecordType.from(
-        protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
+    BeamSqlRecordType dataType = CalciteUtils
+        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
     BeamSqlRow row1 = new BeamSqlRow(dataType);
     row1.addField(0, 12345L);
     row1.addField(1, 0);
@@ -80,7 +83,7 @@ public class BasePlanner {
     row4.addField(2, 20.5);
     row4.addField(3, new Date());
 
-    return new MockedBeamSqlTable(protoRowType).withInputRecords(
+    return new MockedBeamSqlTable(dataType).withInputRecords(
         Arrays.asList(row1, row2, row3, row4));
   }
 
@@ -93,10 +96,13 @@ public class BasePlanner {
       }
     };
 
+    BeamSqlRecordType dataType = CalciteUtils
+        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
+
     Map<String, Object> consumerPara = new HashMap<String, Object>();
     consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
 
-    return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic))
+    return new BeamKafkaCSVTable(dataType, bootstrapServer, Arrays.asList(topic))
         .updateConsumerProperties(consumerPara);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
index e12eca2..f98517b 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
@@ -27,6 +27,7 @@ import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -69,8 +70,8 @@ public class BeamPlannerAggregationSubmitTest {
       }
     };
 
-    BeamSqlRecordType dataType = BeamSqlRecordType.from(
-        protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
+    BeamSqlRecordType dataType = CalciteUtils
+        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
     BeamSqlRow row1 = new BeamSqlRow(dataType);
     row1.addField(0, 12345L);
     row1.addField(1, 1);
@@ -91,7 +92,7 @@ public class BeamPlannerAggregationSubmitTest {
     row4.addField(1, 0);
     row4.addField(2, format.parse("2017-01-01 03:04:05"));
 
-    return new MockedBeamSqlTable(protoRowType).withInputRecords(
+    return new MockedBeamSqlTable(dataType).withInputRecords(
         Arrays.asList(row1
             , row2, row3, row4
             ));
@@ -108,7 +109,10 @@ public class BeamPlannerAggregationSubmitTest {
             .add("size", SqlTypeName.BIGINT).build();
       }
     };
-    return new MockedBeamSqlTable(protoRowType);
+    BeamSqlRecordType dataType = CalciteUtils
+        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
+
+    return new MockedBeamSqlTable(dataType);
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
index 185e95a..f651f6a 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
@@ -26,6 +26,7 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -49,8 +50,8 @@ public class MockedBeamSqlTable extends BaseBeamTable {
 
   private List<BeamSqlRow> inputRecords;
 
-  public MockedBeamSqlTable(RelProtoDataType protoRowType) {
-    super(protoRowType);
+  public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) {
+    super(beamSqlRecordType);
   }
 
   public MockedBeamSqlTable withInputRecords(List<BeamSqlRow> inputRecords){
@@ -102,8 +103,8 @@ public class MockedBeamSqlTable extends BaseBeamTable {
     };
 
     List<BeamSqlRow> rows = new ArrayList<>();
-    BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.from(
-        protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
+    BeamSqlRecordType beamSQLRecordType = CalciteUtils
+        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
     int fieldCount = beamSQLRecordType.size();
 
     for (int i = fieldCount * 2; i < args.length; i += fieldCount) {
@@ -113,7 +114,7 @@ public class MockedBeamSqlTable extends BaseBeamTable {
       }
       rows.add(row);
     }
-    return new MockedBeamSqlTable(protoRowType).withInputRecords(rows);
+    return new MockedBeamSqlTable(beamSQLRecordType).withInputRecords(rows);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
index a085eae..8dc8439 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
@@ -21,6 +21,7 @@ import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.planner.BasePlanner;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PBegin;
@@ -49,14 +50,16 @@ public class BeamPCollectionTableTest extends BasePlanner{
             .add("c2", SqlTypeName.VARCHAR).build();
       }
     };
+    BeamSqlRecordType dataType = CalciteUtils
+        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
 
-    BeamSqlRow row = new BeamSqlRow(BeamSqlRecordType.from(
+    BeamSqlRow row = new BeamSqlRow(CalciteUtils.toBeamRecordType(
         protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)));
     row.addField(0, 1);
     row.addField(1, "hello world.");
     PCollection<BeamSqlRow> inputStream = PBegin.in(pipeline).apply(Create.of(row));
     BeamSqlEnv.registerTable("COLLECTION_TABLE",
-        new BeamPCollectionTable(inputStream, protoRowType));
+        new BeamPCollectionTable(inputStream, dataType));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index c087825..b358fe1 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -22,6 +22,7 @@ import java.math.BigDecimal;
 import java.util.Date;
 import java.util.GregorianCalendar;
 
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataType;
@@ -56,7 +57,7 @@ public class BeamSqlRowCoderTest {
       }
     };
 
-    BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.from(
+    BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(
         protoRowType.apply(new JavaTypeFactoryImpl(
         RelDataTypeSystem.DEFAULT)));
     BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
index fc19d40..9cd0915 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -91,16 +92,14 @@ public class BeamKafkaCSVTableTest {
   }
 
   private static BeamSqlRecordType genRowType() {
-    return BeamSqlRecordType.from(
-        new RelProtoDataType() {
-          @Override public RelDataType apply(RelDataTypeFactory a0) {
-            return a0.builder()
-                .add("order_id", SqlTypeName.BIGINT)
-                .add("site_id", SqlTypeName.INTEGER)
-                .add("price", SqlTypeName.DOUBLE)
-                .build();
-          }
-        }.apply(BeamQueryPlanner.TYPE_FACTORY));
+    return CalciteUtils.toBeamRecordType(new RelProtoDataType() {
+
+      @Override public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder().add("order_id", SqlTypeName.BIGINT)
+            .add("site_id", SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE).build();
+      }
+    }.apply(BeamQueryPlanner.TYPE_FACTORY));
   }
 
   private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
index d782aad..176df46 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
@@ -35,6 +35,7 @@ import java.util.List;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -80,19 +81,20 @@ public class BeamTextCSVTableTest {
   private static File writerTargetFile;
 
   @Test public void testBuildIOReader() {
-    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildRowType(),
+    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRecordType(),
         readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
     PAssert.that(rows).containsInAnyOrder(testDataRows);
     pipeline.run();
   }
 
   @Test public void testBuildIOWriter() {
-    new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
-        .apply(new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath())
+    new BeamTextCSVTable(buildBeamSqlRecordType(),
+        readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
+        .apply(new BeamTextCSVTable(buildBeamSqlRecordType(), writerTargetFile.getAbsolutePath())
             .buildIOWriter());
     pipeline.run();
 
-    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildRowType(),
+    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRecordType(),
         writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
 
     // confirm the two reads match
@@ -166,7 +168,7 @@ public class BeamTextCSVTableTest {
   }
 
   private static BeamSqlRecordType buildBeamSqlRecordType() {
-    return BeamSqlRecordType.from(buildRelDataType());
+    return CalciteUtils.toBeamRecordType(buildRelDataType());
   }
 
   private static BeamSqlRow buildRow(Object[] data) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
index 5cbbe41..388a344 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
@@ -21,11 +21,13 @@ import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.testing.PAssert;
@@ -431,7 +433,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
     for (KV<String, SqlTypeName> cm : columnMetadata) {
       builder.add(cm.getKey(), cm.getValue());
     }
-    return BeamSqlRecordType.from(builder.build());
+    return CalciteUtils.toBeamRecordType(builder.build());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
index ef85347..2e91405 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -72,7 +73,7 @@ public class BeamTransformBaseTest {
     for (KV<String, SqlTypeName> cm : columnMetadata) {
       builder.add(cm.getKey(), cm.getValue());
     }
-    return BeamSqlRecordType.from(builder.build());
+    return CalciteUtils.toBeamRecordType(builder.build());
   }
 
   /**


[2/2] beam git commit: [BEAM-2442] This closes #3357

Posted by ta...@apache.org.
[BEAM-2442] This closes #3357


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df5859d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df5859d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df5859d4

Branch: refs/heads/DSL_SQL
Commit: df5859d44dcd930a36e4996acd586037516edbb0
Parents: 2e92534 7bcbad5
Author: Tyler Akidau <ta...@apache.org>
Authored: Thu Jun 15 17:42:02 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Jun 15 17:42:02 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/dsls/sql/BeamSql.java  |  19 ++--
 .../org/apache/beam/dsls/sql/BeamSqlCli.java    |  13 +--
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java    |  52 ++++++++-
 .../beam/dsls/sql/example/BeamSqlExample.java   |   8 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  11 +-
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |   4 +-
 .../beam/dsls/sql/rel/BeamProjectRel.java       |   7 +-
 .../apache/beam/dsls/sql/rel/BeamSortRel.java   |   6 +-
 .../apache/beam/dsls/sql/rel/BeamValuesRel.java |   3 +-
 .../beam/dsls/sql/schema/BaseBeamTable.java     |  70 +-----------
 .../dsls/sql/schema/BeamPCollectionTable.java   |  10 +-
 .../beam/dsls/sql/schema/BeamSqlRecordType.java |  42 +-------
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java |   5 +-
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |   6 +-
 .../beam/dsls/sql/schema/BeamSqlTable.java      |  52 +++++++++
 .../beam/dsls/sql/schema/BeamTableUtils.java    |   3 +-
 .../sql/schema/kafka/BeamKafkaCSVTable.java     |   9 +-
 .../dsls/sql/schema/kafka/BeamKafkaTable.java   |  12 ++-
 .../beam/dsls/sql/schema/package-info.java      |   1 -
 .../dsls/sql/schema/text/BeamTextCSVTable.java  |  10 +-
 .../dsls/sql/schema/text/BeamTextTable.java     |   9 +-
 .../transform/BeamAggregationTransforms.java    |   8 +-
 .../beam/dsls/sql/utils/CalciteUtils.java       | 108 +++++++++++++++++++
 .../beam/dsls/sql/utils/package-info.java       |  22 ++++
 .../beam/dsls/sql/BeamSqlApiSurfaceTest.java    |  59 ++++++++++
 .../interpreter/BeamSqlFnExecutorTestBase.java  |   3 +-
 .../beam/dsls/sql/planner/BasePlanner.java      |  22 ++--
 .../BeamPlannerAggregationSubmitTest.java       |  12 ++-
 .../dsls/sql/planner/MockedBeamSqlTable.java    |  11 +-
 .../sql/schema/BeamPCollectionTableTest.java    |   7 +-
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |   3 +-
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |  19 ++--
 .../sql/schema/text/BeamTextCSVTableTest.java   |  12 ++-
 .../transform/BeamAggregationTransformTest.java |   4 +-
 .../schema/transform/BeamTransformBaseTest.java |   3 +-
 35 files changed, 433 insertions(+), 212 deletions(-)
----------------------------------------------------------------------