You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/09 16:41:00 UTC

[2/3] beam git commit: [BEAM-2745] add BeamRecordSqlType.getFieldTypeByIndex()

[BEAM-2745] add BeamRecordSqlType.getFieldTypeByIndex()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c76129a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c76129a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c76129a

Branch: refs/heads/DSL_SQL
Commit: 7c76129a02c19595b257cff6efdc1fd0e637815d
Parents: 6628674
Author: James Xu <xu...@gmail.com>
Authored: Tue Aug 8 14:53:18 2017 +0800
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Aug 9 09:34:10 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/values/BeamRecord.java  |  21 ++-
 .../apache/beam/sdk/values/BeamRecordType.java  |  25 +--
 .../apache/beam/sdk/extensions/sql/BeamSql.java |   4 +-
 .../beam/sdk/extensions/sql/BeamSqlEnv.java     |   6 +-
 .../extensions/sql/example/BeamSqlExample.java  |   4 +-
 .../sql/impl/rel/BeamAggregationRel.java        |  16 +-
 .../extensions/sql/impl/rel/BeamJoinRel.java    |  10 +-
 .../extensions/sql/impl/rel/BeamValuesRel.java  |   6 +-
 .../transform/BeamAggregationTransforms.java    |  25 +--
 .../sql/impl/transform/BeamJoinTransforms.java  |  22 +--
 .../sql/impl/transform/BeamSqlProjectFn.java    |   8 +-
 .../extensions/sql/impl/utils/CalciteUtils.java |  16 +-
 .../extensions/sql/schema/BaseBeamTable.java    |   6 +-
 .../sql/schema/BeamPCollectionTable.java        |   4 +-
 .../sql/schema/BeamRecordSqlType.java           | 185 +++++++++++++++++++
 .../sql/schema/BeamSqlRecordHelper.java         |   4 +-
 .../sql/schema/BeamSqlRecordType.java           | 175 ------------------
 .../sdk/extensions/sql/schema/BeamSqlTable.java |   2 +-
 .../extensions/sql/schema/BeamTableUtils.java   |  14 +-
 .../sql/schema/kafka/BeamKafkaCSVTable.java     |  14 +-
 .../sql/schema/kafka/BeamKafkaTable.java        |   6 +-
 .../sql/schema/text/BeamTextCSVTable.java       |   6 +-
 .../schema/text/BeamTextCSVTableIOReader.java   |   6 +-
 .../schema/text/BeamTextCSVTableIOWriter.java   |   6 +-
 .../sql/schema/text/BeamTextTable.java          |   4 +-
 .../sql/BeamSqlDslAggregationTest.java          |  14 +-
 .../beam/sdk/extensions/sql/BeamSqlDslBase.java |   6 +-
 .../sdk/extensions/sql/BeamSqlDslJoinTest.java  |  10 +-
 .../extensions/sql/BeamSqlDslProjectTest.java   |  10 +-
 .../extensions/sql/BeamSqlDslUdfUdafTest.java   |   6 +-
 .../beam/sdk/extensions/sql/TestUtils.java      |  14 +-
 .../interpreter/BeamSqlFnExecutorTestBase.java  |   4 +-
 ...mSqlBuiltinFunctionsIntegrationTestBase.java |   6 +-
 ...amSqlComparisonOperatorsIntegrationTest.java |   4 +-
 .../extensions/sql/mock/MockedBoundedTable.java |   6 +-
 .../sdk/extensions/sql/mock/MockedTable.java    |   4 +-
 .../sql/mock/MockedUnboundedTable.java          |   4 +-
 .../sql/schema/BeamSqlRowCoderTest.java         |   2 +-
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |   4 +-
 .../sql/schema/text/BeamTextCSVTableTest.java   |   4 +-
 .../transform/BeamAggregationTransformTest.java |  10 +-
 .../schema/transform/BeamTransformBaseTest.java |   8 +-
 42 files changed, 368 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index 6e4bd4c..a3ede3c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -37,7 +37,20 @@ public class BeamRecord implements Serializable {
   private List<Object> dataValues;
   private BeamRecordType dataType;
 
-  public BeamRecord(BeamRecordType dataType, List<Object> rawdataValues) {
+  /**
+   * Creates a BeamRecord.
+   * @param dataType type of the record
+   * @param rawDataValues values of the record, record's size must match size of
+   *                      the {@code BeamRecordType}, or can be null, if it is null
+   *                      then every field is null.
+   */
+  public BeamRecord(BeamRecordType dataType, List<Object> rawDataValues) {
+    if (dataType.getFieldNames().size() != rawDataValues.size()) {
+      throw new IllegalArgumentException(
+          "Field count in BeamRecordType(" + dataType.getFieldNames().size()
+              + ") and rawDataValues(" + rawDataValues.size() + ") must match!");
+    }
+
     this.dataType = dataType;
     this.dataValues = new ArrayList<>(dataType.size());
 
@@ -46,7 +59,7 @@ public class BeamRecord implements Serializable {
     }
 
     for (int idx = 0; idx < dataType.size(); ++idx) {
-      addField(idx, rawdataValues.get(idx));
+      addField(idx, rawDataValues.get(idx));
     }
   }
 
@@ -60,7 +73,7 @@ public class BeamRecord implements Serializable {
   }
 
   public Object getFieldValue(String fieldName) {
-    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+    return getFieldValue(dataType.getFieldNames().indexOf(fieldName));
   }
 
   public Byte getByte(String fieldName) {
@@ -179,7 +192,7 @@ public class BeamRecord implements Serializable {
     StringBuilder sb = new StringBuilder();
     for (int idx = 0; idx < size(); ++idx) {
       sb.append(
-          String.format(",%s=%s", getDataType().getFieldsName().get(idx), getFieldValue(idx)));
+          String.format(",%s=%s", getDataType().getFieldNames().get(idx), getFieldValue(idx)));
     }
     return sb.substring(1);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
index 3b20b50..6ab783c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.values;
 
+import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -28,12 +29,12 @@ import org.apache.beam.sdk.coders.Coder;
  */
 @Experimental
 public class BeamRecordType implements Serializable{
-  private List<String> fieldsName;
-  private List<Coder> fieldsCoder;
+  private List<String> fieldNames;
+  private List<Coder> fieldCoders;
 
-  public BeamRecordType(List<String> fieldsName, List<Coder> fieldsCoder) {
-    this.fieldsName = fieldsName;
-    this.fieldsCoder = fieldsCoder;
+  public BeamRecordType(List<String> fieldNames, List<Coder> fieldCoders) {
+    this.fieldNames = fieldNames;
+    this.fieldCoders = fieldCoders;
   }
 
   /**
@@ -49,22 +50,22 @@ public class BeamRecordType implements Serializable{
     * Get the coder for {@link BeamRecordCoder}.
     */
    public BeamRecordCoder getRecordCoder(){
-     return BeamRecordCoder.of(this, fieldsCoder);
+     return BeamRecordCoder.of(this, fieldCoders);
    }
 
-   public List<String> getFieldsName(){
-     return fieldsName;
+   public List<String> getFieldNames(){
+     return ImmutableList.copyOf(fieldNames);
    }
 
-   public String getFieldByIndex(int index){
-     return fieldsName.get(index);
+   public String getFieldNameByIndex(int index){
+     return fieldNames.get(index);
    }
 
    public int findIndexOfField(String fieldName){
-     return fieldsName.indexOf(fieldName);
+     return fieldNames.indexOf(fieldName);
    }
 
   public int size(){
-    return fieldsName.size();
+    return fieldNames.size();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index d0a6360..ac617ad 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.BeamRecordCoder;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -179,7 +179,7 @@ public class BeamSql {
 
         getSqlEnv().registerTable(sourceTag.getId(),
             new BeamPCollectionTable(sourceStream,
-                (BeamSqlRecordType) sourceCoder.getRecordType()));
+                (BeamRecordSqlType) sourceCoder.getRecordType()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
index 3c5eb36..4d21425 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.calcite.DataContext;
@@ -84,8 +84,8 @@ public class BeamSqlEnv implements Serializable{
   }
 
   private static class BeamCalciteTable implements ScannableTable, Serializable {
-    private BeamSqlRecordType beamSqlRowType;
-    public BeamCalciteTable(BeamSqlRecordType beamSqlRowType) {
+    private BeamRecordSqlType beamSqlRowType;
+    public BeamCalciteTable(BeamRecordSqlType beamSqlRowType) {
       this.beamSqlRowType = beamSqlRowType;
     }
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index acb5943..3a46acc 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -22,7 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.BeamSql;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
@@ -53,7 +53,7 @@ class BeamSqlExample {
     //define the input row format
     List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
     List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
-    BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
+    BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
     BeamRecord row = new BeamRecord(type, 1, "row", 1.0);
 
     //create a source PCollection with Create.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index d91b484..4b557f9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -119,23 +119,23 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
   /**
    * Type of sub-rowrecord used as Group-By keys.
    */
-  private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
-    BeamSqlRecordType inputRowType = CalciteUtils.toBeamRowType(relDataType);
+  private BeamRecordSqlType exKeyFieldsSchema(RelDataType relDataType) {
+    BeamRecordSqlType inputRowType = CalciteUtils.toBeamRowType(relDataType);
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (int i : groupSet.asList()) {
       if (i != windowFieldIdx) {
-        fieldNames.add(inputRowType.getFieldsName().get(i));
-        fieldTypes.add(inputRowType.getFieldsType().get(i));
+        fieldNames.add(inputRowType.getFieldNameByIndex(i));
+        fieldTypes.add(inputRowType.getFieldTypeByIndex(i));
       }
     }
-    return BeamSqlRecordType.create(fieldNames, fieldTypes);
+    return BeamRecordSqlType.create(fieldNames, fieldTypes);
   }
 
   /**
    * Type of sub-rowrecord, that represents the list of aggregation fields.
    */
-  private BeamSqlRecordType exAggFieldsSchema() {
+  private BeamRecordSqlType exAggFieldsSchema() {
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (AggregateCall ac : getAggCallList()) {
@@ -143,7 +143,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
       fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
     }
 
-    return BeamSqlRecordType.create(fieldNames, fieldTypes);
+    return BeamRecordSqlType.create(fieldNames, fieldTypes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 2bd15b3..9dceb25 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -97,7 +97,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
       BeamSqlEnv sqlEnv)
       throws Exception {
     BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-    BeamSqlRecordType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
+    BeamRecordSqlType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
     PCollection<BeamRecord> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
 
     final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
@@ -117,9 +117,9 @@ public class BeamJoinRel extends Join implements BeamRelNode {
     List<Integer> types = new ArrayList<>(pairs.size());
     for (int i = 0; i < pairs.size(); i++) {
       names.add("c" + i);
-      types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
+      types.add(leftRowType.getFieldTypeByIndex(pairs.get(i).getKey()));
     }
-    BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types);
+    BeamRecordSqlType extractKeyRowType = BeamRecordSqlType.create(names, types);
 
     Coder extractKeyRowCoder = extractKeyRowType.getRecordCoder();
 
@@ -255,7 +255,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
   }
 
   private BeamRecord buildNullRow(BeamRelNode relNode) {
-    BeamSqlRecordType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
+    BeamRecordSqlType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
     return new BeamRecord(leftType, Collections.nCopies(leftType.size(), null));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index 1d666ca..fde002e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.BeamRecord;
@@ -63,12 +63,12 @@ public class BeamValuesRel extends Values implements BeamRelNode {
       throw new IllegalStateException("Values with empty tuples!");
     }
 
-    BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
+    BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
     for (ImmutableList<RexLiteral> tuple : tuples) {
       List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.size());
       for (int i = 0; i < tuple.size(); i++) {
         fieldsValue.add(BeamTableUtils.autoCastField(
-            beamSQLRowType.getFieldsType().get(i), tuple.get(i).getValue()));
+            beamSQLRowType.getFieldTypeByIndex(i), tuple.get(i).getValue()));
       }
       rows.add(new BeamRecord(beamSQLRowType, fieldsValue));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index c6a5d26..0f90bee 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -59,11 +59,11 @@ public class BeamAggregationTransforms implements Serializable{
    * Merge KV to single record.
    */
   public static class MergeAggregationRecord extends DoFn<KV<BeamRecord, BeamRecord>, BeamRecord> {
-    private BeamSqlRecordType outRowType;
+    private BeamRecordSqlType outRowType;
     private List<String> aggFieldNames;
     private int windowStartFieldIdx;
 
-    public MergeAggregationRecord(BeamSqlRecordType outRowType, List<AggregateCall> aggList
+    public MergeAggregationRecord(BeamRecordSqlType outRowType, List<AggregateCall> aggList
         , int windowStartFieldIdx) {
       this.outRowType = outRowType;
       this.aggFieldNames = new ArrayList<>();
@@ -75,10 +75,11 @@ public class BeamAggregationTransforms implements Serializable{
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) {
-      List<Object> fieldValues = new ArrayList<>();
       KV<BeamRecord, BeamRecord> kvRecord = c.element();
+      List<Object> fieldValues = new ArrayList<>();
       fieldValues.addAll(kvRecord.getKey().getDataValues());
       fieldValues.addAll(kvRecord.getValue().getDataValues());
+
       if (windowStartFieldIdx != -1) {
         fieldValues.add(windowStartFieldIdx, ((IntervalWindow) window).start().toDate());
       }
@@ -106,7 +107,7 @@ public class BeamAggregationTransforms implements Serializable{
 
     @Override
     public BeamRecord apply(BeamRecord input) {
-      BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input));
+      BeamRecordSqlType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input));
 
       List<Object> fieldValues = new ArrayList<>(groupByKeys.size());
       for (int idx = 0; idx < groupByKeys.size(); ++idx) {
@@ -117,14 +118,14 @@ public class BeamAggregationTransforms implements Serializable{
       return keyOfRecord;
     }
 
-    private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) {
+    private BeamRecordSqlType exTypeOfKeyRecord(BeamRecordSqlType dataType) {
       List<String> fieldNames = new ArrayList<>();
       List<Integer> fieldTypes = new ArrayList<>();
       for (int idx : groupByKeys) {
-        fieldNames.add(dataType.getFieldsName().get(idx));
-        fieldTypes.add(dataType.getFieldsType().get(idx));
+        fieldNames.add(dataType.getFieldNameByIndex(idx));
+        fieldTypes.add(dataType.getFieldTypeByIndex(idx));
       }
-      return BeamSqlRecordType.create(fieldNames, fieldTypes);
+      return BeamRecordSqlType.create(fieldNames, fieldTypes);
     }
   }
 
@@ -152,10 +153,10 @@ public class BeamAggregationTransforms implements Serializable{
     extends CombineFn<BeamRecord, AggregationAccumulator, BeamRecord> {
     private List<BeamSqlUdaf> aggregators;
     private List<BeamSqlExpression> sourceFieldExps;
-    private BeamSqlRecordType finalRowType;
+    private BeamRecordSqlType finalRowType;
 
     public AggregationAdaptor(List<AggregateCall> aggregationCalls,
-        BeamSqlRecordType sourceRowType) {
+        BeamRecordSqlType sourceRowType) {
       aggregators = new ArrayList<>();
       sourceFieldExps = new ArrayList<>();
       List<String> outFieldsName = new ArrayList<>();
@@ -204,7 +205,7 @@ public class BeamAggregationTransforms implements Serializable{
           break;
         }
       }
-      finalRowType = BeamSqlRecordType.create(outFieldsName, outFieldsType);
+      finalRowType = BeamRecordSqlType.create(outFieldsName, outFieldsType);
     }
     @Override
     public AggregationAccumulator createAccumulator() {

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 8f34704..9a48c53 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.BeamRecord;
@@ -58,12 +58,12 @@ public class BeamJoinTransforms {
       for (int i = 0; i < joinColumns.size(); i++) {
         names.add("c" + i);
         types.add(isLeft
-            ? BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType()
-                .get(joinColumns.get(i).getKey())
-            : BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType()
-                .get(joinColumns.get(i).getValue()));
+            ? BeamSqlRecordHelper.getSqlRecordType(input).getFieldTypeByIndex(
+                joinColumns.get(i).getKey())
+            : BeamSqlRecordHelper.getSqlRecordType(input).getFieldTypeByIndex(
+                joinColumns.get(i).getValue()));
       }
-      BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+      BeamRecordSqlType type = BeamRecordSqlType.create(names, types);
 
       // build the row
       List<Object> fieldValues = new ArrayList<>(joinColumns.size());
@@ -146,13 +146,13 @@ public class BeamJoinTransforms {
       BeamRecord rightRow) {
     // build the type
     List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
-    names.addAll(leftRow.getDataType().getFieldsName());
-    names.addAll(rightRow.getDataType().getFieldsName());
+    names.addAll(leftRow.getDataType().getFieldNames());
+    names.addAll(rightRow.getDataType().getFieldNames());
 
     List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
-    types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldsType());
-    types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldsType());
-    BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+    types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldTypes());
+    types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldTypes());
+    BeamRecordSqlType type = BeamRecordSqlType.create(names, types);
 
     List<Object> fieldValues = new ArrayList<>(leftRow.getDataValues());
     fieldValues.addAll(rightRow.getDataValues());

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
index 34d6dbb..aac38c7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
@@ -21,7 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -35,10 +35,10 @@ import org.apache.beam.sdk.values.BeamRecord;
 public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> {
   private String stepName;
   private BeamSqlExpressionExecutor executor;
-  private BeamSqlRecordType outputRowType;
+  private BeamRecordSqlType outputRowType;
 
   public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
-      BeamSqlRecordType outputRowType) {
+      BeamRecordSqlType outputRowType) {
     super();
     this.stepName = stepName;
     this.executor = executor;
@@ -57,7 +57,7 @@ public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> {
     List<Object> fieldsValue = new ArrayList<>(results.size());
     for (int idx = 0; idx < results.size(); ++idx) {
       fieldsValue.add(
-          BeamTableUtils.autoCastField(outputRowType.getFieldsType().get(idx), results.get(idx)));
+          BeamTableUtils.autoCastField(outputRowType.getFieldTypeByIndex(idx), results.get(idx)));
     }
     BeamRecord outRow = new BeamRecord(outputRowType, fieldsValue);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index bf96e85..8b6206b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -78,33 +78,33 @@ public class CalciteUtils {
   /**
    * Get the {@code SqlTypeName} for the specified column of a table.
    */
-  public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) {
-    return toCalciteType(schema.getFieldsType().get(index));
+  public static SqlTypeName getFieldType(BeamRecordSqlType schema, int index) {
+    return toCalciteType(schema.getFieldTypeByIndex(index));
   }
 
   /**
    * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
    */
-  public static BeamSqlRecordType toBeamRowType(RelDataType tableInfo) {
+  public static BeamRecordSqlType toBeamRowType(RelDataType tableInfo) {
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (RelDataTypeField f : tableInfo.getFieldList()) {
       fieldNames.add(f.getName());
       fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
     }
-    return BeamSqlRecordType.create(fieldNames, fieldTypes);
+    return BeamRecordSqlType.create(fieldNames, fieldTypes);
   }
 
   /**
    * Create an instance of {@code RelDataType} so it can be used to create a table.
    */
-  public static RelProtoDataType toCalciteRowType(final BeamSqlRecordType that) {
+  public static RelProtoDataType toCalciteRowType(final BeamRecordSqlType that) {
     return new RelProtoDataType() {
       @Override
       public RelDataType apply(RelDataTypeFactory a) {
         RelDataTypeFactory.FieldInfoBuilder builder = a.builder();
-        for (int idx = 0; idx < that.getFieldsName().size(); ++idx) {
-          builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx)));
+        for (int idx = 0; idx < that.getFieldNames().size(); ++idx) {
+          builder.add(that.getFieldNameByIndex(idx), toCalciteType(that.getFieldTypeByIndex(idx)));
         }
         return builder.build();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
index 68b120e..0564820 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
@@ -23,12 +23,12 @@ import java.io.Serializable;
  * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
  */
 public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
-  protected BeamSqlRecordType beamSqlRowType;
-  public BaseBeamTable(BeamSqlRecordType beamSqlRowType) {
+  protected BeamRecordSqlType beamSqlRowType;
+  public BaseBeamTable(BeamRecordSqlType beamSqlRowType) {
     this.beamSqlRowType = beamSqlRowType;
   }
 
-  @Override public BeamSqlRecordType getRowType() {
+  @Override public BeamRecordSqlType getRowType() {
     return beamSqlRowType;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
index 68905b5..9d9988e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
@@ -32,12 +32,12 @@ public class BeamPCollectionTable extends BaseBeamTable {
   private BeamIOType ioType;
   private transient PCollection<BeamRecord> upstream;
 
-  protected BeamPCollectionTable(BeamSqlRecordType beamSqlRowType) {
+  protected BeamPCollectionTable(BeamRecordSqlType beamSqlRowType) {
     super(beamSqlRowType);
   }
 
   public BeamPCollectionTable(PCollection<BeamRecord> upstream,
-      BeamSqlRecordType beamSqlRowType){
+      BeamRecordSqlType beamSqlRowType){
     this(beamSqlRowType);
     ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
         ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
new file mode 100644
index 0000000..1845988
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.BeamRecordType;
+
+/**
+ * Type provider for {@link BeamRecord} with SQL types.
+ *
+ * <p>Limited SQL types are supported now, visit
+ * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
+ * for more details.
+ *
+ */
+public class BeamRecordSqlType extends BeamRecordType {
+  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
+  static {
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+  }
+
+  public List<Integer> fieldTypes;
+
+  protected BeamRecordSqlType(List<String> fieldsName, List<Coder> fieldsCoder) {
+    super(fieldsName, fieldsCoder);
+  }
+
+  private BeamRecordSqlType(List<String> fieldsName, List<Integer> fieldTypes
+      , List<Coder> fieldsCoder) {
+    super(fieldsName, fieldsCoder);
+    this.fieldTypes = fieldTypes;
+  }
+
+  public static BeamRecordSqlType create(List<String> fieldNames,
+      List<Integer> fieldTypes) {
+    if (fieldNames.size() != fieldTypes.size()) {
+      throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
+    }
+    List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
+    for (int idx = 0; idx < fieldTypes.size(); ++idx) {
+      switch (fieldTypes.get(idx)) {
+      case Types.INTEGER:
+        fieldCoders.add(BigEndianIntegerCoder.of());
+        break;
+      case Types.SMALLINT:
+        fieldCoders.add(ShortCoder.of());
+        break;
+      case Types.TINYINT:
+        fieldCoders.add(ByteCoder.of());
+        break;
+      case Types.DOUBLE:
+        fieldCoders.add(DoubleCoder.of());
+        break;
+      case Types.FLOAT:
+        fieldCoders.add(FloatCoder.of());
+        break;
+      case Types.DECIMAL:
+        fieldCoders.add(BigDecimalCoder.of());
+        break;
+      case Types.BIGINT:
+        fieldCoders.add(BigEndianLongCoder.of());
+        break;
+      case Types.VARCHAR:
+      case Types.CHAR:
+        fieldCoders.add(StringUtf8Coder.of());
+        break;
+      case Types.TIME:
+        fieldCoders.add(TimeCoder.of());
+        break;
+      case Types.DATE:
+      case Types.TIMESTAMP:
+        fieldCoders.add(DateCoder.of());
+        break;
+      case Types.BOOLEAN:
+        fieldCoders.add(BooleanCoder.of());
+        break;
+
+      default:
+        throw new UnsupportedOperationException(
+            "Data type: " + fieldTypes.get(idx) + " not supported yet!");
+      }
+    }
+    return new BeamRecordSqlType(fieldNames, fieldTypes, fieldCoders);
+  }
+
+  @Override
+  public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
+    if (null == fieldValue) {// no need to do type check for NULL value
+      return;
+    }
+
+    int fieldType = fieldTypes.get(index);
+    Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
+    if (javaClazz == null) {
+      throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
+    }
+
+    if (!fieldValue.getClass().equals(javaClazz)) {
+      throw new IllegalArgumentException(
+          String.format("[%s](%s) doesn't match type [%s]",
+              fieldValue, fieldValue.getClass(), fieldType)
+      );
+    }
+  }
+
+  public List<Integer> getFieldTypes() {
+    return fieldTypes;
+  }
+
+  public Integer getFieldTypeByIndex(int index){
+    return fieldTypes.get(index);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj != null && obj instanceof BeamRecordSqlType) {
+      BeamRecordSqlType ins = (BeamRecordSqlType) obj;
+      return fieldTypes.equals(ins.getFieldTypes()) && getFieldNames().equals(ins.getFieldNames());
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return 31 * getFieldNames().hashCode() + getFieldTypes().hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "BeamRecordSqlType [fieldNames=" + getFieldNames()
+        + ", fieldTypes=" + fieldTypes + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
index b910c84..89eefd1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
@@ -39,8 +39,8 @@ import org.apache.beam.sdk.values.BeamRecord;
 @Experimental
 public class BeamSqlRecordHelper {
 
-  public static BeamSqlRecordType getSqlRecordType(BeamRecord record) {
-    return (BeamSqlRecordType) record.getDataType();
+  public static BeamRecordSqlType getSqlRecordType(BeamRecord record) {
+    return (BeamRecordSqlType) record.getDataType();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
deleted file mode 100644
index b7c7438..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.BeamRecordType;
-
-/**
- * Type provider for {@link BeamRecord} with SQL types.
- *
- * <p>Limited SQL types are supported now, visit
- * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
- * for more details.
- *
- */
-public class BeamSqlRecordType extends BeamRecordType {
-  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
-  static {
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
-  }
-
-  public List<Integer> fieldsType;
-
-  protected BeamSqlRecordType(List<String> fieldsName, List<Coder> fieldsCoder) {
-    super(fieldsName, fieldsCoder);
-  }
-
-  private BeamSqlRecordType(List<String> fieldsName, List<Integer> fieldsType
-      , List<Coder> fieldsCoder) {
-    super(fieldsName, fieldsCoder);
-    this.fieldsType = fieldsType;
-  }
-
-  public static BeamSqlRecordType create(List<String> fieldNames,
-      List<Integer> fieldTypes) {
-    if (fieldNames.size() != fieldTypes.size()) {
-      throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
-    }
-    List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
-    for (int idx = 0; idx < fieldTypes.size(); ++idx) {
-      switch (fieldTypes.get(idx)) {
-      case Types.INTEGER:
-        fieldCoders.add(BigEndianIntegerCoder.of());
-        break;
-      case Types.SMALLINT:
-        fieldCoders.add(ShortCoder.of());
-        break;
-      case Types.TINYINT:
-        fieldCoders.add(ByteCoder.of());
-        break;
-      case Types.DOUBLE:
-        fieldCoders.add(DoubleCoder.of());
-        break;
-      case Types.FLOAT:
-        fieldCoders.add(FloatCoder.of());
-        break;
-      case Types.DECIMAL:
-        fieldCoders.add(BigDecimalCoder.of());
-        break;
-      case Types.BIGINT:
-        fieldCoders.add(BigEndianLongCoder.of());
-        break;
-      case Types.VARCHAR:
-      case Types.CHAR:
-        fieldCoders.add(StringUtf8Coder.of());
-        break;
-      case Types.TIME:
-        fieldCoders.add(TimeCoder.of());
-        break;
-      case Types.DATE:
-      case Types.TIMESTAMP:
-        fieldCoders.add(DateCoder.of());
-        break;
-      case Types.BOOLEAN:
-        fieldCoders.add(BooleanCoder.of());
-        break;
-
-      default:
-        throw new UnsupportedOperationException(
-            "Data type: " + fieldTypes.get(idx) + " not supported yet!");
-      }
-    }
-    return new BeamSqlRecordType(fieldNames, fieldTypes, fieldCoders);
-  }
-
-  @Override
-  public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
-    if (null == fieldValue) {// no need to do type check for NULL value
-      return;
-    }
-
-    int fieldType = fieldsType.get(index);
-    Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
-    if (javaClazz == null) {
-      throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
-    }
-
-    if (!fieldValue.getClass().equals(javaClazz)) {
-      throw new IllegalArgumentException(
-          String.format("[%s](%s) doesn't match type [%s]",
-              fieldValue, fieldValue.getClass(), fieldType)
-      );
-    }
-  }
-
-  public List<Integer> getFieldsType() {
-    return fieldsType;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj != null && obj instanceof BeamSqlRecordType) {
-      BeamSqlRecordType ins = (BeamSqlRecordType) obj;
-      return fieldsType.equals(ins.getFieldsType()) && getFieldsName().equals(ins.getFieldsName());
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public int hashCode() {
-    return 31 * getFieldsName().hashCode() + getFieldsType().hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
index b370d9d..828ac43 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
@@ -49,5 +49,5 @@ public interface BeamSqlTable {
   /**
    * Get the schema info of the table.
    */
-   BeamSqlRecordType getRowType();
+   BeamRecordSqlType getRowType();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
index 19d3e39..99f9522 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -40,27 +40,27 @@ public final class BeamTableUtils {
   public static BeamRecord csvLine2BeamSqlRow(
       CSVFormat csvFormat,
       String line,
-      BeamSqlRecordType beamSqlRowType) {
-    List<Object> fieldsValue = new ArrayList<>(beamSqlRowType.size());
+      BeamRecordSqlType beamRecordSqlType) {
+    List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.size());
     try (StringReader reader = new StringReader(line)) {
       CSVParser parser = csvFormat.parse(reader);
       CSVRecord rawRecord = parser.getRecords().get(0);
 
-      if (rawRecord.size() != beamSqlRowType.size()) {
+      if (rawRecord.size() != beamRecordSqlType.size()) {
         throw new IllegalArgumentException(String.format(
             "Expect %d fields, but actually %d",
-            beamSqlRowType.size(), rawRecord.size()
+            beamRecordSqlType.size(), rawRecord.size()
         ));
       } else {
-        for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
+        for (int idx = 0; idx < beamRecordSqlType.size(); idx++) {
           String raw = rawRecord.get(idx);
-          fieldsValue.add(autoCastField(beamSqlRowType.getFieldsType().get(idx), raw));
+          fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw));
         }
       }
     } catch (IOException e) {
       throw new IllegalArgumentException("decodeRecord failed!", e);
     }
-    return new BeamRecord(beamSqlRowType, fieldsValue);
+    return new BeamRecord(beamRecordSqlType, fieldsValue);
   }
 
   public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
index f137379..8c7e6f0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.schema.kafka;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -34,12 +34,12 @@ import org.apache.commons.csv.CSVFormat;
  */
 public class BeamKafkaCSVTable extends BeamKafkaTable {
   private CSVFormat csvFormat;
-  public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
+  public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
       List<String> topics) {
     this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
   }
 
-  public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
+  public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
       List<String> topics, CSVFormat format) {
     super(beamSqlRowType, bootstrapServers, topics);
     this.csvFormat = format;
@@ -63,9 +63,9 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
    */
   public static class CsvRecorderDecoder
       extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> {
-    private BeamSqlRecordType rowType;
+    private BeamRecordSqlType rowType;
     private CSVFormat format;
-    public CsvRecorderDecoder(BeamSqlRecordType rowType, CSVFormat format) {
+    public CsvRecorderDecoder(BeamRecordSqlType rowType, CSVFormat format) {
       this.rowType = rowType;
       this.format = format;
     }
@@ -88,9 +88,9 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
    */
   public static class CsvRecorderEncoder
       extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> {
-    private BeamSqlRecordType rowType;
+    private BeamRecordSqlType rowType;
     private CSVFormat format;
-    public CsvRecorderEncoder(BeamSqlRecordType rowType, CSVFormat format) {
+    public CsvRecorderEncoder(BeamRecordSqlType rowType, CSVFormat format) {
       this.rowType = rowType;
       this.format = format;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
index fac57bf..1d57839 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.BeamRecord;
@@ -48,11 +48,11 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
   private List<String> topics;
   private Map<String, Object> configUpdates;
 
-  protected BeamKafkaTable(BeamSqlRecordType beamSqlRowType) {
+  protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) {
     super(beamSqlRowType);
   }
 
-  public BeamKafkaTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
+  public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
       List<String> topics) {
     super(beamSqlRowType);
     this.bootstrapServers = bootstrapServers;

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
index 0ec418c..79e56e6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
@@ -19,7 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.schema.text;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.BeamRecord;
@@ -46,11 +46,11 @@ public class BeamTextCSVTable extends BeamTextTable {
   /**
    * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
    */
-  public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern)  {
+  public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern)  {
     this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
   }
 
-  public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern,
+  public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern,
       CSVFormat csvFormat) {
     super(beamSqlRowType, filePattern);
     this.csvFormat = csvFormat;

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
index ecb77e0..018dae5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -19,7 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.schema.text;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -35,10 +35,10 @@ public class BeamTextCSVTableIOReader
     extends PTransform<PCollection<String>, PCollection<BeamRecord>>
     implements Serializable {
   private String filePattern;
-  protected BeamSqlRecordType beamSqlRowType;
+  protected BeamRecordSqlType beamSqlRowType;
   protected CSVFormat csvFormat;
 
-  public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRowType, String filePattern,
+  public BeamTextCSVTableIOReader(BeamRecordSqlType beamSqlRowType, String filePattern,
       CSVFormat csvFormat) {
     this.filePattern = filePattern;
     this.beamSqlRowType = beamSqlRowType;

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
index c616973..53eb382 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -19,7 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.schema.text;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -36,10 +36,10 @@ import org.apache.commons.csv.CSVFormat;
 public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone>
     implements Serializable {
   private String filePattern;
-  protected BeamSqlRecordType beamSqlRowType;
+  protected BeamRecordSqlType beamSqlRowType;
   protected CSVFormat csvFormat;
 
-  public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRowType, String filePattern,
+  public BeamTextCSVTableIOWriter(BeamRecordSqlType beamSqlRowType, String filePattern,
       CSVFormat csvFormat) {
     this.filePattern = filePattern;
     this.beamSqlRowType = beamSqlRowType;

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
index 4284366..80e81aa 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.schema.text;
 import java.io.Serializable;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 
 /**
  * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
   protected String filePattern;
 
-  protected BeamTextTable(BeamSqlRecordType beamSqlRowType, String filePattern) {
+  protected BeamTextTable(BeamRecordSqlType beamSqlRowType, String filePattern) {
     super(beamSqlRowType);
     this.filePattern = filePattern;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 19ca398..4e74dbb 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql;
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
@@ -54,7 +54,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     PCollection<BeamRecord> result =
         input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamRecord record = new BeamRecord(resultType, 0, 4L);
@@ -95,7 +95,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testAggregationFunctions", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(
         Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
             "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
             "max5", "min5", "max6", "min6"),
@@ -141,7 +141,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     PCollection<BeamRecord> result =
         input.apply("testDistinct", BeamSql.simpleQuery(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamRecord record1 = new BeamRecord(resultType, 1, 1000L);
@@ -179,7 +179,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testTumbleWindow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
@@ -215,7 +215,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     PCollection<BeamRecord> result =
         input.apply("testHopWindow", BeamSql.simpleQuery(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
@@ -254,7 +254,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testSessionWindow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index 02427ae..ef75ee2 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -25,7 +25,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;
@@ -52,7 +52,7 @@ public class BeamSqlDslBase {
   @Rule
   public ExpectedException exceptions = ExpectedException.none();
 
-  public static BeamSqlRecordType rowTypeInTableA;
+  public static BeamRecordSqlType rowTypeInTableA;
   public static List<BeamRecord> recordsInTableA;
 
   //bounded PCollections
@@ -65,7 +65,7 @@ public class BeamSqlDslBase {
 
   @BeforeClass
   public static void prepareClass() throws ParseException {
-    rowTypeInTableA = BeamSqlRecordType.create(
+    rowTypeInTableA = BeamRecordSqlType.create(
         Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
             "f_timestamp", "f_int2", "f_decimal"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index d5d0a24..0876dd9 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -24,7 +24,7 @@ import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBo
 import java.sql.Types;
 import java.util.Arrays;
 import org.apache.beam.sdk.coders.BeamRecordCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.BeamRecord;
@@ -41,8 +41,8 @@ public class BeamSqlDslJoinTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
-  private static final BeamSqlRecordType SOURCE_RECORD_TYPE =
-      BeamSqlRecordType.create(
+  private static final BeamRecordSqlType SOURCE_RECORD_TYPE =
+      BeamRecordSqlType.create(
           Arrays.asList(
               "order_id", "site_id", "price"
           ),
@@ -53,8 +53,8 @@ public class BeamSqlDslJoinTest {
 
   private static final BeamRecordCoder SOURCE_CODER = SOURCE_RECORD_TYPE.getRecordCoder();
 
-  private static final BeamSqlRecordType RESULT_RECORD_TYPE =
-      BeamSqlRecordType.create(
+  private static final BeamRecordSqlType RESULT_RECORD_TYPE =
+      BeamRecordSqlType.create(
           Arrays.asList(
           "order_id", "site_id", "price", "order_id0", "site_id0", "price0"
           ),

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
index c8041a8..46aea99 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql;
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
@@ -81,7 +81,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testPartialFields", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamRecord record = new BeamRecord(resultType
@@ -115,7 +115,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamRecord record1 = new BeamRecord(resultType
@@ -158,7 +158,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testPartialFieldsInRows", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamRecord record1 = new BeamRecord(resultType
@@ -201,7 +201,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testLiteralField", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("literal_field"),
         Arrays.asList(Types.INTEGER));
 
     BeamRecord record = new BeamRecord(resultType, 1);

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 25e76e9..7302376 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql;
 import java.sql.Types;
 import java.util.Arrays;
 import java.util.Iterator;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.testing.PAssert;
@@ -39,7 +39,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
    */
   @Test
   public void testUdaf() throws Exception {
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"),
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "squaresum"),
         Arrays.asList(Types.INTEGER, Types.INTEGER));
 
     BeamRecord record = new BeamRecord(resultType, 0, 30);
@@ -67,7 +67,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
    */
   @Test
   public void testUdf() throws Exception{
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"),
+    BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "cubicvalue"),
         Arrays.asList(Types.INTEGER, Types.INTEGER));
 
     BeamRecord record = new BeamRecord(resultType, 2, 8);

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index e9dc88f..aa1fc29 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.BeamRecord;
 
@@ -69,7 +69,7 @@ public class TestUtils {
    * {@code}
    */
   public static class RowsBuilder {
-    private BeamSqlRecordType type;
+    private BeamRecordSqlType type;
     private List<BeamRecord> rows = new ArrayList<>();
 
     /**
@@ -86,7 +86,7 @@ public class TestUtils {
      * @args pairs of column type and column names.
      */
     public static RowsBuilder of(final Object... args) {
-      BeamSqlRecordType beamSQLRowType = buildBeamSqlRowType(args);
+      BeamRecordSqlType beamSQLRowType = buildBeamSqlRowType(args);
       RowsBuilder builder = new RowsBuilder();
       builder.type = beamSQLRowType;
 
@@ -103,7 +103,7 @@ public class TestUtils {
      * )}</pre>
      * @beamSQLRowType the record type.
      */
-    public static RowsBuilder of(final BeamSqlRecordType beamSQLRowType) {
+    public static RowsBuilder of(final BeamRecordSqlType beamSQLRowType) {
       RowsBuilder builder = new RowsBuilder();
       builder.type = beamSQLRowType;
 
@@ -153,7 +153,7 @@ public class TestUtils {
    *   )
    * }</pre>
    */
-  public static BeamSqlRecordType buildBeamSqlRowType(Object... args) {
+  public static BeamRecordSqlType buildBeamSqlRowType(Object... args) {
     List<Integer> types = new ArrayList<>();
     List<String> names = new ArrayList<>();
 
@@ -162,7 +162,7 @@ public class TestUtils {
       names.add((String) args[i + 1]);
     }
 
-    return BeamSqlRecordType.create(names, types);
+    return BeamRecordSqlType.create(names, types);
   }
 
   /**
@@ -179,7 +179,7 @@ public class TestUtils {
    *   )
    * }</pre>
    */
-  public static List<BeamRecord> buildRows(BeamSqlRecordType type, List args) {
+  public static List<BeamRecord> buildRows(BeamRecordSqlType type, List args) {
     List<BeamRecord> rows = new ArrayList<>();
     int fieldCount = type.size();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index 86e2ca4..97905c5 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.Lex;
@@ -57,7 +57,7 @@ public class BeamSqlFnExecutorTestBase {
       RelDataTypeSystem.DEFAULT);
   public static RelDataType relDataType;
 
-  public static BeamSqlRecordType beamRowType;
+  public static BeamRecordSqlType beamRowType;
   public static BeamRecord record;
 
   public static RelBuilder relBuilder;

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index b58a17f..5898e2e 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -32,7 +32,7 @@ import java.util.TimeZone;
 import org.apache.beam.sdk.extensions.sql.BeamSql;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.BeamRecord;
@@ -62,7 +62,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
   public final TestPipeline pipeline = TestPipeline.create();
 
   protected PCollection<BeamRecord> getTestPCollection() {
-    BeamSqlRecordType type = BeamSqlRecordType.create(
+    BeamRecordSqlType type = BeamRecordSqlType.create(
         Arrays.asList("ts", "c_tinyint", "c_smallint",
             "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
             "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
@@ -155,7 +155,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
         PCollection<BeamRecord> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
         PAssert.that(rows).containsInAnyOrder(
             TestUtils.RowsBuilder
-                .of(BeamSqlRecordType.create(names, types))
+                .of(BeamRecordSqlType.create(names, types))
                 .addRows(values)
                 .getRows()
         );

http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
index 3569e31..4ce2f45 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -22,7 +22,7 @@ import java.math.BigDecimal;
 import java.sql.Types;
 import java.util.Arrays;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Test;
@@ -282,7 +282,7 @@ public class BeamSqlComparisonOperatorsIntegrationTest
   }
 
   @Override protected PCollection<BeamRecord> getTestPCollection() {
-    BeamSqlRecordType type = BeamSqlRecordType.create(
+    BeamRecordSqlType type = BeamRecordSqlType.create(
         Arrays.asList(
             "c_tinyint_0", "c_tinyint_1", "c_tinyint_2",
             "c_smallint_0", "c_smallint_1", "c_smallint_2",