You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lz...@apache.org on 2017/07/19 02:06:41 UTC

[1/3] beam git commit: [BEAM-2621] BeamSqlRecordType -> BeamSqlRowType

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL a1f7cf6de -> d4d615a72


http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
index 41a786f..9ed56b4 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -18,8 +18,8 @@
 
 package org.apache.beam.dsls.sql.schema.text;
 
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -46,13 +46,13 @@ public class BeamTextCSVTable extends BeamTextTable {
   /**
    * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
    */
-  public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String filePattern)  {
-    this(beamSqlRecordType, filePattern, CSVFormat.DEFAULT);
+  public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern)  {
+    this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
   }
 
-  public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String filePattern,
+  public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern,
       CSVFormat csvFormat) {
-    super(beamSqlRecordType, filePattern);
+    super(beamSqlRowType, filePattern);
     this.csvFormat = csvFormat;
   }
 
@@ -60,11 +60,11 @@ public class BeamTextCSVTable extends BeamTextTable {
   public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
     return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
         .apply("parseCSVLine",
-            new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat));
+            new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
   }
 
   @Override
   public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
-    return new BeamTextCSVTableIOWriter(beamSqlRecordType, filePattern, csvFormat);
+    return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
index ef0a465..874c3e4 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -21,9 +21,8 @@ package org.apache.beam.dsls.sql.schema.text;
 import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
 
 import java.io.Serializable;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -37,13 +36,13 @@ public class BeamTextCSVTableIOReader
     extends PTransform<PCollection<String>, PCollection<BeamSqlRow>>
     implements Serializable {
   private String filePattern;
-  protected BeamSqlRecordType beamSqlRecordType;
+  protected BeamSqlRowType beamSqlRowType;
   protected CSVFormat csvFormat;
 
-  public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRecordType, String filePattern,
+  public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern,
       CSVFormat csvFormat) {
     this.filePattern = filePattern;
-    this.beamSqlRecordType = beamSqlRecordType;
+    this.beamSqlRowType = beamSqlRowType;
     this.csvFormat = csvFormat;
   }
 
@@ -53,7 +52,7 @@ public class BeamTextCSVTableIOReader
           @ProcessElement
           public void processElement(ProcessContext ctx) {
             String str = ctx.element();
-            ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRecordType));
+            ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType));
           }
         }));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
index 35a546c..f61bb71 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -21,9 +21,8 @@ package org.apache.beam.dsls.sql.schema.text;
 import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
 
 import java.io.Serializable;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -38,13 +37,13 @@ import org.apache.commons.csv.CSVFormat;
 public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone>
     implements Serializable {
   private String filePattern;
-  protected BeamSqlRecordType beamSqlRecordType;
+  protected BeamSqlRowType beamSqlRowType;
   protected CSVFormat csvFormat;
 
-  public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRecordType, String filePattern,
+  public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern,
       CSVFormat csvFormat) {
     this.filePattern = filePattern;
-    this.beamSqlRecordType = beamSqlRecordType;
+    this.beamSqlRowType = beamSqlRowType;
     this.csvFormat = csvFormat;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
index 525c210..6dc6cd0 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 
 /**
  * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
@@ -30,8 +30,8 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
   protected String filePattern;
 
-  protected BeamTextTable(BeamSqlRecordType beamSqlRecordType, String filePattern) {
-    super(beamSqlRecordType);
+  protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) {
+    super(beamSqlRowType);
     this.filePattern = filePattern;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
index 34b169f..5b21765 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
@@ -27,8 +27,8 @@ import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.BigDecimalCoder;
@@ -57,13 +57,13 @@ public class BeamAggregationTransforms implements Serializable{
    * Merge KV to single record.
    */
   public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
-    private BeamSqlRecordType outRecordType;
+    private BeamSqlRowType outRowType;
     private List<String> aggFieldNames;
     private int windowStartFieldIdx;
 
-    public MergeAggregationRecord(BeamSqlRecordType outRecordType, List<AggregateCall> aggList
+    public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList
         , int windowStartFieldIdx) {
-      this.outRecordType = outRecordType;
+      this.outRowType = outRowType;
       this.aggFieldNames = new ArrayList<>();
       for (AggregateCall ac : aggList) {
         aggFieldNames.add(ac.getName());
@@ -73,7 +73,7 @@ public class BeamAggregationTransforms implements Serializable{
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) {
-      BeamSqlRow outRecord = new BeamSqlRow(outRecordType);
+      BeamSqlRow outRecord = new BeamSqlRow(outRowType);
       outRecord.updateWindowRange(c.element().getKey(), window);
 
       KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element();
@@ -109,7 +109,7 @@ public class BeamAggregationTransforms implements Serializable{
 
     @Override
     public BeamSqlRow apply(BeamSqlRow input) {
-      BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(input.getDataType());
+      BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType());
       BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey);
       keyOfRecord.updateWindowRange(input, null);
 
@@ -119,14 +119,14 @@ public class BeamAggregationTransforms implements Serializable{
       return keyOfRecord;
     }
 
-    private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) {
+    private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) {
       List<String> fieldNames = new ArrayList<>();
       List<Integer> fieldTypes = new ArrayList<>();
       for (int idx : groupByKeys) {
         fieldNames.add(dataType.getFieldsName().get(idx));
         fieldTypes.add(dataType.getFieldsType().get(idx));
       }
-      return BeamSqlRecordType.create(fieldNames, fieldTypes);
+      return BeamSqlRowType.create(fieldNames, fieldTypes);
     }
   }
 
@@ -154,10 +154,10 @@ public class BeamAggregationTransforms implements Serializable{
     extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> {
     private List<BeamSqlUdaf> aggregators;
     private List<BeamSqlExpression> sourceFieldExps;
-    private BeamSqlRecordType finalRecordType;
+    private BeamSqlRowType finalRowType;
 
     public AggregationAdaptor(List<AggregateCall> aggregationCalls,
-        BeamSqlRecordType sourceRowRecordType) {
+        BeamSqlRowType sourceRowType) {
       aggregators = new ArrayList<>();
       sourceFieldExps = new ArrayList<>();
       List<String> outFieldsName = new ArrayList<>();
@@ -165,7 +165,7 @@ public class BeamAggregationTransforms implements Serializable{
       for (AggregateCall call : aggregationCalls) {
         int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0;
         BeamSqlExpression sourceExp = new BeamSqlInputRefExpression(
-            CalciteUtils.getFieldType(sourceRowRecordType, refIndex), refIndex);
+            CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex);
         sourceFieldExps.add(sourceExp);
 
         outFieldsName.add(call.name);
@@ -206,7 +206,7 @@ public class BeamAggregationTransforms implements Serializable{
           break;
         }
       }
-      finalRecordType = BeamSqlRecordType.create(outFieldsName, outFieldsType);
+      finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType);
     }
     @Override
     public AggregationAccumulator createAccumulator() {
@@ -241,7 +241,7 @@ public class BeamAggregationTransforms implements Serializable{
     }
     @Override
     public BeamSqlRow extractOutput(AggregationAccumulator accumulator) {
-      BeamSqlRow result = new BeamSqlRow(finalRecordType);
+      BeamSqlRow result = new BeamSqlRow(finalRowType);
       for (int idx = 0; idx < aggregators.size(); ++idx) {
         result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
index 8169b83..9ea4376 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.KV;
@@ -60,7 +60,7 @@ public class BeamJoinTransforms {
             ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
             input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
       }
-      BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+      BeamSqlRowType type = BeamSqlRowType.create(names, types);
 
       // build the row
       BeamSqlRow row = new BeamSqlRow(type);
@@ -149,7 +149,7 @@ public class BeamJoinTransforms {
     List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
     types.addAll(leftRow.getDataType().getFieldsType());
     types.addAll(rightRow.getDataType().getFieldsType());
-    BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+    BeamSqlRowType type = BeamSqlRowType.create(names, types);
 
     BeamSqlRow row = new BeamSqlRow(type);
     // build the row

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
index 2a3357c..886ddcf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
@@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql.transform;
 import java.util.List;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -34,14 +34,14 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
   private String stepName;
   private BeamSqlExpressionExecutor executor;
-  private BeamSqlRecordType outputRecordType;
+  private BeamSqlRowType outputRowType;
 
   public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
-      BeamSqlRecordType outputRecordType) {
+      BeamSqlRowType outputRowType) {
     super();
     this.stepName = stepName;
     this.executor = executor;
-    this.outputRecordType = outputRecordType;
+    this.outputRowType = outputRowType;
   }
 
   @Setup
@@ -51,11 +51,11 @@ public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
 
   @ProcessElement
   public void processElement(ProcessContext c, BoundedWindow window) {
-    BeamSqlRow inputRecord = c.element();
-    List<Object> results = executor.execute(inputRecord);
+    BeamSqlRow inputRow = c.element();
+    List<Object> results = executor.execute(inputRow);
 
-    BeamSqlRow outRow = new BeamSqlRow(outputRecordType);
-    outRow.updateWindowRange(inputRecord, window);
+    BeamSqlRow outRow = new BeamSqlRow(outputRowType);
+    outRow.updateWindowRange(inputRow, window);
 
     for (int idx = 0; idx < results.size(); ++idx) {
       BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
index 919ae5f..4b8696b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -78,27 +78,27 @@ public class CalciteUtils {
   /**
    * Get the {@code SqlTypeName} for the specified column of a table.
    */
-  public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) {
+  public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) {
     return toCalciteType(schema.getFieldsType().get(index));
   }
 
   /**
-   * Generate {@code BeamSqlRecordType} from {@code RelDataType} which is used to create table.
+   * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
    */
-  public static BeamSqlRecordType toBeamRecordType(RelDataType tableInfo) {
+  public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) {
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (RelDataTypeField f : tableInfo.getFieldList()) {
       fieldNames.add(f.getName());
       fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
     }
-    return BeamSqlRecordType.create(fieldNames, fieldTypes);
+    return BeamSqlRowType.create(fieldNames, fieldTypes);
   }
 
   /**
    * Create an instance of {@code RelDataType} so it can be used to create a table.
    */
-  public static RelProtoDataType toCalciteRecordType(final BeamSqlRecordType that) {
+  public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) {
     return new RelProtoDataType() {
       @Override
       public RelDataType apply(RelDataTypeFactory a) {

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
index 471a856..a142514 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql;
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -55,7 +55,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     PCollection<BeamSqlRow> result =
         input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamSqlRow record = new BeamSqlRow(resultType);
@@ -98,7 +98,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testAggregationFunctions", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamSqlRowType resultType = BeamSqlRowType.create(
         Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
             "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
             "max5", "min5", "max6", "min6"),
@@ -167,7 +167,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     PCollection<BeamSqlRow> result =
         input.apply("testDistinct", BeamSql.simpleQuery(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamSqlRow record1 = new BeamSqlRow(resultType);
@@ -216,7 +216,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testTumbleWindow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamSqlRowType resultType = BeamSqlRowType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
@@ -263,7 +263,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     PCollection<BeamSqlRow> result =
         input.apply("testHopWindow", BeamSql.simpleQuery(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamSqlRowType resultType = BeamSqlRowType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
@@ -325,7 +325,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testSessionWindow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamSqlRowType resultType = BeamSqlRowType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
index 57fcbc3..24f1a0a 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
@@ -24,9 +24,9 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;
@@ -52,7 +52,7 @@ public class BeamSqlDslBase {
   @Rule
   public ExpectedException exceptions = ExpectedException.none();
 
-  public static BeamSqlRecordType recordTypeInTableA;
+  public static BeamSqlRowType rowTypeInTableA;
   public static List<BeamSqlRow> recordsInTableA;
 
   //bounded PCollections
@@ -65,22 +65,22 @@ public class BeamSqlDslBase {
 
   @BeforeClass
   public static void prepareClass() throws ParseException {
-    recordTypeInTableA = BeamSqlRecordType.create(
+    rowTypeInTableA = BeamSqlRowType.create(
         Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
             "f_timestamp", "f_int2"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,
             Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER));
 
-    recordsInTableA = prepareInputRecordsInTableA();
+    recordsInTableA = prepareInputRowsInTableA();
   }
 
   @Before
   public void preparePCollections(){
     boundedInput1 = PBegin.in(pipeline).apply("boundedInput1",
-        Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+        Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
 
     boundedInput2 = PBegin.in(pipeline).apply("boundedInput2",
-        Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+        Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
 
     unboundedInput1 = prepareUnboundedPCollection1();
     unboundedInput2 = prepareUnboundedPCollection2();
@@ -88,7 +88,7 @@ public class BeamSqlDslBase {
 
   private PCollection<BeamSqlRow> prepareUnboundedPCollection1() {
     TestStream.Builder<BeamSqlRow> values = TestStream
-        .create(new BeamSqlRowCoder(recordTypeInTableA));
+        .create(new BeamSqlRowCoder(rowTypeInTableA));
 
     for (BeamSqlRow row : recordsInTableA) {
       values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
@@ -100,7 +100,7 @@ public class BeamSqlDslBase {
 
   private PCollection<BeamSqlRow> prepareUnboundedPCollection2() {
     TestStream.Builder<BeamSqlRow> values = TestStream
-        .create(new BeamSqlRowCoder(recordTypeInTableA));
+        .create(new BeamSqlRowCoder(rowTypeInTableA));
 
     BeamSqlRow row = recordsInTableA.get(0);
     values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
@@ -109,10 +109,10 @@ public class BeamSqlDslBase {
     return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity());
   }
 
-  private static List<BeamSqlRow> prepareInputRecordsInTableA() throws ParseException{
+  private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{
     List<BeamSqlRow> rows = new ArrayList<>();
 
-    BeamSqlRow row1 = new BeamSqlRow(recordTypeInTableA);
+    BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA);
     row1.addField(0, 1);
     row1.addField(1, 1000L);
     row1.addField(2, Short.valueOf("1"));
@@ -124,7 +124,7 @@ public class BeamSqlDslBase {
     row1.addField(8, 0);
     rows.add(row1);
 
-    BeamSqlRow row2 = new BeamSqlRow(recordTypeInTableA);
+    BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA);
     row2.addField(0, 2);
     row2.addField(1, 2000L);
     row2.addField(2, Short.valueOf("2"));
@@ -136,7 +136,7 @@ public class BeamSqlDslBase {
     row2.addField(8, 0);
     rows.add(row2);
 
-    BeamSqlRow row3 = new BeamSqlRow(recordTypeInTableA);
+    BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA);
     row3.addField(0, 3);
     row3.addField(1, 3000L);
     row3.addField(2, Short.valueOf("3"));
@@ -148,7 +148,7 @@ public class BeamSqlDslBase {
     row3.addField(8, 0);
     rows.add(row3);
 
-    BeamSqlRow row4 = new BeamSqlRow(recordTypeInTableA);
+    BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA);
     row4.addField(0, 4);
     row4.addField(1, 4000L);
     row4.addField(2, Short.valueOf("4"));

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
index ae5f4e5..e010915 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
@@ -23,9 +23,9 @@ import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -41,8 +41,8 @@ public class BeamSqlDslJoinTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
-  private static final BeamSqlRecordType SOURCE_RECORD_TYPE =
-      BeamSqlRecordType.create(
+  private static final BeamSqlRowType SOURCE_RECORD_TYPE =
+      BeamSqlRowType.create(
           Arrays.asList(
               "order_id", "site_id", "price"
           ),
@@ -54,8 +54,8 @@ public class BeamSqlDslJoinTest {
   private static final BeamSqlRowCoder SOURCE_CODER =
       new BeamSqlRowCoder(SOURCE_RECORD_TYPE);
 
-  private static final BeamSqlRecordType RESULT_RECORD_TYPE =
-      BeamSqlRecordType.create(
+  private static final BeamSqlRowType RESULT_RECORD_TYPE =
+      BeamSqlRowType.create(
           Arrays.asList(
           "order_id", "site_id", "price", "order_id0", "site_id0", "price0"
           ),

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
index 10f61b0..ab5a639 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql;
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -81,7 +81,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testPartialFields", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamSqlRow record = new BeamSqlRow(resultType);
@@ -116,7 +116,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamSqlRow record1 = new BeamSqlRow(resultType);
@@ -163,7 +163,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testPartialFieldsInRows", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamSqlRow record1 = new BeamSqlRow(resultType);
@@ -210,7 +210,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testLiteralField", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"),
         Arrays.asList(Types.INTEGER));
 
     BeamSqlRow record = new BeamSqlRow(resultType);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
index 332a273..726f658 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
@@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql;
 import java.sql.Types;
 import java.util.Arrays;
 import java.util.Iterator;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.testing.PAssert;
@@ -39,7 +39,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
    */
   @Test
   public void testUdaf() throws Exception {
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"),
         Arrays.asList(Types.INTEGER, Types.INTEGER));
 
     BeamSqlRow record = new BeamSqlRow(resultType);
@@ -69,7 +69,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
    */
   @Test
   public void testUdf() throws Exception{
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"),
         Arrays.asList(Types.INTEGER, Types.INTEGER));
 
     BeamSqlRow record = new BeamSqlRow(resultType);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
index 8c0a28d..a669635 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -21,8 +21,8 @@ package org.apache.beam.dsls.sql;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**
@@ -69,7 +69,7 @@ public class TestUtils {
    * {@code}
    */
   public static class RowsBuilder {
-    private BeamSqlRecordType type;
+    private BeamSqlRowType type;
     private List<BeamSqlRow> rows = new ArrayList<>();
 
     /**
@@ -86,9 +86,9 @@ public class TestUtils {
      * @args pairs of column type and column names.
      */
     public static RowsBuilder of(final Object... args) {
-      BeamSqlRecordType beamSQLRecordType = buildBeamSqlRecordType(args);
+      BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args);
       RowsBuilder builder = new RowsBuilder();
-      builder.type = beamSQLRecordType;
+      builder.type = beamSQLRowType;
 
       return builder;
     }
@@ -99,13 +99,13 @@ public class TestUtils {
      * <p>For example:
      * <pre>{@code
      * TestUtils.RowsBuilder.of(
-     *   beamSqlRecordType
+     *   beamSqlRowType
      * )}</pre>
-     * @beamSQLRecordType the record type.
+     * @beamSQLRowType the record type.
      */
-    public static RowsBuilder of(final BeamSqlRecordType beamSQLRecordType) {
+    public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) {
       RowsBuilder builder = new RowsBuilder();
-      builder.type = beamSQLRecordType;
+      builder.type = beamSQLRowType;
 
       return builder;
     }
@@ -140,12 +140,12 @@ public class TestUtils {
   }
 
   /**
-   * Convenient way to build a {@code BeamSqlRecordType}.
+   * Convenient way to build a {@code BeamSqlRowType}.
    *
    * <p>e.g.
    *
    * <pre>{@code
-   *   buildBeamSqlRecordType(
+   *   buildBeamSqlRowType(
    *       Types.BIGINT, "order_id",
    *       Types.INTEGER, "site_id",
    *       Types.DOUBLE, "price",
@@ -153,7 +153,7 @@ public class TestUtils {
    *   )
    * }</pre>
    */
-  public static BeamSqlRecordType buildBeamSqlRecordType(Object... args) {
+  public static BeamSqlRowType buildBeamSqlRowType(Object... args) {
     List<Integer> types = new ArrayList<>();
     List<String> names = new ArrayList<>();
 
@@ -162,7 +162,7 @@ public class TestUtils {
       names.add((String) args[i + 1]);
     }
 
-    return BeamSqlRecordType.create(names, types);
+    return BeamSqlRowType.create(names, types);
   }
 
   /**
@@ -172,14 +172,14 @@ public class TestUtils {
    *
    * <pre>{@code
    *   buildRows(
-   *       recordType,
+   *       rowType,
    *       1, 1, 1, // the first row
    *       2, 2, 2, // the second row
    *       ...
    *   )
    * }</pre>
    */
-  public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, List args) {
+  public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) {
     List<BeamSqlRow> rows = new ArrayList<>();
     int fieldCount = type.size();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index ddbc3d8..b9ce9b4 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -32,9 +32,9 @@ import java.util.TimeZone;
 import org.apache.beam.dsls.sql.BeamSql;
 import org.apache.beam.dsls.sql.TestUtils;
 import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -63,7 +63,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
   public final TestPipeline pipeline = TestPipeline.create();
 
   protected PCollection<BeamSqlRow> getTestPCollection() {
-    BeamSqlRecordType type = BeamSqlRecordType.create(
+    BeamSqlRowType type = BeamSqlRowType.create(
         Arrays.asList("ts", "c_tinyint", "c_smallint",
             "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
             "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
@@ -156,7 +156,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
         PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
         PAssert.that(rows).containsInAnyOrder(
             TestUtils.RowsBuilder
-                .of(BeamSqlRecordType.create(names, types))
+                .of(BeamSqlRowType.create(names, types))
                 .addRows(values)
                 .getRows()
         );

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
index 5afd273..d7b54c7 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
@@ -23,8 +23,8 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem;
 import org.apache.beam.dsls.sql.planner.BeamRuleSets;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.Lex;
@@ -57,7 +57,7 @@ public class BeamSqlFnExecutorTestBase {
       RelDataTypeSystem.DEFAULT);
   public static RelDataType relDataType;
 
-  public static BeamSqlRecordType beamRecordType;
+  public static BeamSqlRowType beamRowType;
   public static BeamSqlRow record;
 
   public static RelBuilder relBuilder;
@@ -70,8 +70,8 @@ public class BeamSqlFnExecutorTestBase {
         .add("price", SqlTypeName.DOUBLE)
         .add("order_time", SqlTypeName.BIGINT).build();
 
-    beamRecordType = CalciteUtils.toBeamRecordType(relDataType);
-    record = new BeamSqlRow(beamRecordType);
+    beamRowType = CalciteUtils.toBeamRowType(relDataType);
+    record = new BeamSqlRow(beamRowType);
 
     record.addField(0, 1234567L);
     record.addField(1, 0);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
index 84f49a9..6c1dcb2 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.dsls.sql.mock;
 
-import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
+import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType;
 import static org.apache.beam.dsls.sql.TestUtils.buildRows;
 
 import java.util.ArrayList;
@@ -25,8 +25,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -45,8 +45,8 @@ public class MockedBoundedTable extends MockedTable {
   /** rows flow out from this table. */
   private final List<BeamSqlRow> rows = new ArrayList<>();
 
-  public MockedBoundedTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
+  public MockedBoundedTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
   }
 
   /**
@@ -63,13 +63,13 @@ public class MockedBoundedTable extends MockedTable {
    * }</pre>
    */
   public static MockedBoundedTable of(final Object... args){
-    return new MockedBoundedTable(buildBeamSqlRecordType(args));
+    return new MockedBoundedTable(buildBeamSqlRowType(args));
   }
 
   /**
    * Build a mocked bounded table with the specified type.
    */
-  public static MockedBoundedTable of(final BeamSqlRecordType type) {
+  public static MockedBoundedTable of(final BeamSqlRowType type) {
     return new MockedBoundedTable(type);
   }
 
@@ -88,7 +88,7 @@ public class MockedBoundedTable extends MockedTable {
    * }</pre>
    */
   public MockedBoundedTable addRows(Object... args) {
-    List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args));
+    List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
     this.rows.addAll(rows);
     return this;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
index eed740a..858ae88 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
@@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql.mock;
 
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.values.PDone;
  */
 public abstract class MockedTable extends BaseBeamTable {
   public static final AtomicInteger COUNTER = new AtomicInteger();
-  public MockedTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
+  public MockedTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
index 0f8c912..ee6eb22 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
@@ -18,16 +18,16 @@
 
 package org.apache.beam.dsls.sql.mock;
 
-import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
+import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType;
 import static org.apache.beam.dsls.sql.TestUtils.buildRows;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.values.PCollection;
@@ -44,8 +44,8 @@ public class MockedUnboundedTable extends MockedTable {
   private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>();
   /** specify the index of column in the row which stands for the event time field. */
   private int timestampField;
-  private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
+  private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
   }
 
   /**
@@ -62,7 +62,7 @@ public class MockedUnboundedTable extends MockedTable {
    * }</pre>
    */
   public static MockedUnboundedTable of(final Object... args){
-    return new MockedUnboundedTable(buildBeamSqlRecordType(args));
+    return new MockedUnboundedTable(buildBeamSqlRowType(args));
   }
 
   public MockedUnboundedTable timestampColumnIndex(int idx) {
@@ -85,7 +85,7 @@ public class MockedUnboundedTable extends MockedTable {
    * }</pre>
    */
   public MockedUnboundedTable addRows(Duration duration, Object... args) {
-    List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args));
+    List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
     // record the watermark + rows
     this.timestampedRows.add(Pair.of(duration, rows));
     return this;
@@ -97,7 +97,7 @@ public class MockedUnboundedTable extends MockedTable {
 
   @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
     TestStream.Builder<BeamSqlRow> values = TestStream.create(
-        new BeamSqlRowCoder(beamSqlRecordType));
+        new BeamSqlRowCoder(beamSqlRowType));
 
     for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
       values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index cf1d714..e41e341 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -58,10 +58,10 @@ public class BeamSqlRowCoderTest {
       }
     };
 
-    BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(
+    BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(
         protoRowType.apply(new JavaTypeFactoryImpl(
             RelDataTypeSystem.DEFAULT)));
-    BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
+    BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
     row.addField("col_tinyint", Byte.valueOf("1"));
     row.addField("col_smallint", Short.valueOf("1"));
     row.addField("col_integer", 1);
@@ -77,7 +77,7 @@ public class BeamSqlRowCoderTest {
     row.addField("col_boolean", true);
 
 
-    BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType);
+    BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRowType);
     CoderProperties.coderDecodeEncodeEqual(coder, row);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
index 9cd0915..01cd960 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
@@ -19,10 +19,9 @@
 package org.apache.beam.dsls.sql.schema.kafka;
 
 import java.io.Serializable;
-
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -91,8 +90,8 @@ public class BeamKafkaCSVTableTest {
     pipeline.run();
   }
 
-  private static BeamSqlRecordType genRowType() {
-    return CalciteUtils.toBeamRecordType(new RelProtoDataType() {
+  private static BeamSqlRowType genRowType() {
+    return CalciteUtils.toBeamRowType(new RelProtoDataType() {
 
       @Override public RelDataType apply(RelDataTypeFactory a0) {
         return a0.builder().add("order_id", SqlTypeName.BIGINT)

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
index 176df46..b6e11e5 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
@@ -31,10 +31,9 @@ import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -81,20 +80,20 @@ public class BeamTextCSVTableTest {
   private static File writerTargetFile;
 
   @Test public void testBuildIOReader() {
-    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRecordType(),
+    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
         readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
     PAssert.that(rows).containsInAnyOrder(testDataRows);
     pipeline.run();
   }
 
   @Test public void testBuildIOWriter() {
-    new BeamTextCSVTable(buildBeamSqlRecordType(),
+    new BeamTextCSVTable(buildBeamSqlRowType(),
         readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
-        .apply(new BeamTextCSVTable(buildBeamSqlRecordType(), writerTargetFile.getAbsolutePath())
+        .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath())
             .buildIOWriter());
     pipeline.run();
 
-    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRecordType(),
+    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
         writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
 
     // confirm the two reads match
@@ -167,11 +166,11 @@ public class BeamTextCSVTableTest {
         .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build();
   }
 
-  private static BeamSqlRecordType buildBeamSqlRecordType() {
-    return CalciteUtils.toBeamRecordType(buildRelDataType());
+  private static BeamSqlRowType buildBeamSqlRowType() {
+    return CalciteUtils.toBeamRowType(buildRelDataType());
   }
 
   private static BeamSqlRow buildRow(Object[] data) {
-    return new BeamSqlRow(buildBeamSqlRecordType(), Arrays.asList(data));
+    return new BeamSqlRow(buildBeamSqlRowType(), Arrays.asList(data));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
index a0fed22..5d5d4fc 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
@@ -21,11 +21,10 @@ import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -64,9 +63,9 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
 
   private List<AggregateCall> aggCalls;
 
-  private BeamSqlRecordType keyType;
-  private BeamSqlRecordType aggPartType;
-  private BeamSqlRecordType outputType;
+  private BeamSqlRowType keyType;
+  private BeamSqlRowType aggPartType;
+  private BeamSqlRowType outputType;
 
   private BeamSqlRowCoder inRecordCoder;
   private BeamSqlRowCoder keyCoder;
@@ -405,7 +404,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
   /**
    * Row type of final output row.
    */
-  private BeamSqlRecordType prepareFinalRowType() {
+  private BeamSqlRowType prepareFinalRowType() {
     FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
     List<KV<String, SqlTypeName>> columnMetadata =
         Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
@@ -433,7 +432,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
     for (KV<String, SqlTypeName> cm : columnMetadata) {
       builder.add(cm.getKey(), cm.getValue());
     }
-    return CalciteUtils.toBeamRecordType(builder.build());
+    return CalciteUtils.toBeamRowType(builder.build());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
index 2e91405..4045bc8 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
@@ -23,8 +23,8 @@ import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
@@ -38,7 +38,7 @@ import org.junit.BeforeClass;
 public class BeamTransformBaseTest {
   public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
-  public static  BeamSqlRecordType inputRowType;
+  public static BeamSqlRowType inputRowType;
   public static List<BeamSqlRow> inputRows;
 
   @BeforeClass
@@ -66,14 +66,14 @@ public class BeamTransformBaseTest {
   }
 
   /**
-   * create a {@code BeamSqlRecordType} for given column metadata.
+   * create a {@code BeamSqlRowType} for given column metadata.
    */
-  public static BeamSqlRecordType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
+  public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
     FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
     for (KV<String, SqlTypeName> cm : columnMetadata) {
       builder.add(cm.getKey(), cm.getValue());
     }
-    return CalciteUtils.toBeamRecordType(builder.build());
+    return CalciteUtils.toBeamRowType(builder.build());
   }
 
   /**
@@ -89,7 +89,7 @@ public class BeamTransformBaseTest {
    */
   public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
       List<Object> rowValues){
-    BeamSqlRecordType rowType = initTypeOfSqlRow(columnMetadata);
+    BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata);
 
     return new BeamSqlRow(rowType, rowValues);
   }


[2/3] beam git commit: [BEAM-2621] BeamSqlRecordType -> BeamSqlRowType

Posted by lz...@apache.org.
[BEAM-2621] BeamSqlRecordType -> BeamSqlRowType


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a9c8a8a1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a9c8a8a1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a9c8a8a1

Branch: refs/heads/DSL_SQL
Commit: a9c8a8a1e1f21d31c973e12b3c05c118e21fa43b
Parents: a1f7cf6
Author: James Xu <xu...@gmail.com>
Authored: Mon Jul 17 17:55:56 2017 +0800
Committer: JingsongLi <lz...@aliyun.com>
Committed: Wed Jul 19 10:05:17 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java    | 12 +++---
 .../beam/dsls/sql/example/BeamSqlExample.java   |  4 +-
 .../interpreter/BeamSqlExpressionExecutor.java  |  2 +-
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java |  4 +-
 .../operator/BeamSqlCaseExpression.java         |  8 ++--
 .../operator/BeamSqlCastExpression.java         | 22 +++++------
 .../operator/BeamSqlCompareExpression.java      |  6 +--
 .../interpreter/operator/BeamSqlExpression.java |  2 +-
 .../operator/BeamSqlInputRefExpression.java     |  4 +-
 .../operator/BeamSqlIsNotNullExpression.java    |  4 +-
 .../operator/BeamSqlIsNullExpression.java       |  4 +-
 .../interpreter/operator/BeamSqlPrimitive.java  |  2 +-
 .../operator/BeamSqlReinterpretExpression.java  |  6 +--
 .../operator/BeamSqlUdfExpression.java          |  4 +-
 .../operator/BeamSqlWindowEndExpression.java    |  4 +-
 .../operator/BeamSqlWindowExpression.java       |  4 +-
 .../operator/BeamSqlWindowStartExpression.java  |  4 +-
 .../arithmetic/BeamSqlArithmeticExpression.java |  6 +--
 .../date/BeamSqlCurrentDateExpression.java      |  2 +-
 .../date/BeamSqlCurrentTimeExpression.java      |  2 +-
 .../date/BeamSqlCurrentTimestampExpression.java |  2 +-
 .../date/BeamSqlDateCeilExpression.java         |  4 +-
 .../date/BeamSqlDateFloorExpression.java        |  4 +-
 .../operator/date/BeamSqlExtractExpression.java |  4 +-
 .../operator/logical/BeamSqlAndExpression.java  |  4 +-
 .../operator/logical/BeamSqlNotExpression.java  |  4 +-
 .../operator/logical/BeamSqlOrExpression.java   |  4 +-
 .../math/BeamSqlMathBinaryExpression.java       |  4 +-
 .../math/BeamSqlMathUnaryExpression.java        |  4 +-
 .../operator/math/BeamSqlPiExpression.java      |  2 +-
 .../string/BeamSqlCharLengthExpression.java     |  4 +-
 .../string/BeamSqlConcatExpression.java         |  6 +--
 .../string/BeamSqlInitCapExpression.java        |  4 +-
 .../operator/string/BeamSqlLowerExpression.java |  4 +-
 .../string/BeamSqlOverlayExpression.java        | 10 ++---
 .../string/BeamSqlPositionExpression.java       |  8 ++--
 .../string/BeamSqlSubstringExpression.java      |  8 ++--
 .../operator/string/BeamSqlTrimExpression.java  | 10 ++---
 .../operator/string/BeamSqlUpperExpression.java |  4 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   | 22 +++++------
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |  2 +-
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |  2 +-
 .../apache/beam/dsls/sql/rel/BeamJoinRel.java   | 12 +++---
 .../beam/dsls/sql/rel/BeamProjectRel.java       |  4 +-
 .../apache/beam/dsls/sql/rel/BeamSortRel.java   |  2 +-
 .../apache/beam/dsls/sql/rel/BeamValuesRel.java |  9 ++---
 .../beam/dsls/sql/schema/BaseBeamTable.java     | 10 ++---
 .../dsls/sql/schema/BeamPCollectionTable.java   |  8 ++--
 .../beam/dsls/sql/schema/BeamSqlRecordType.java | 40 --------------------
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 10 ++---
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  6 +--
 .../beam/dsls/sql/schema/BeamSqlRowType.java    | 40 ++++++++++++++++++++
 .../beam/dsls/sql/schema/BeamSqlTable.java      |  2 +-
 .../beam/dsls/sql/schema/BeamTableUtils.java    | 10 ++---
 .../sql/schema/kafka/BeamKafkaCSVTable.java     | 29 +++++++-------
 .../dsls/sql/schema/kafka/BeamKafkaTable.java   | 11 +++---
 .../dsls/sql/schema/text/BeamTextCSVTable.java  | 14 +++----
 .../schema/text/BeamTextCSVTableIOReader.java   | 11 +++---
 .../schema/text/BeamTextCSVTableIOWriter.java   |  9 ++---
 .../dsls/sql/schema/text/BeamTextTable.java     |  6 +--
 .../transform/BeamAggregationTransforms.java    | 26 ++++++-------
 .../dsls/sql/transform/BeamJoinTransforms.java  |  6 +--
 .../dsls/sql/transform/BeamSqlProjectFn.java    | 16 ++++----
 .../beam/dsls/sql/utils/CalciteUtils.java       | 12 +++---
 .../dsls/sql/BeamSqlDslAggregationTest.java     | 14 +++----
 .../apache/beam/dsls/sql/BeamSqlDslBase.java    | 26 ++++++-------
 .../beam/dsls/sql/BeamSqlDslJoinTest.java       | 10 ++---
 .../beam/dsls/sql/BeamSqlDslProjectTest.java    | 10 ++---
 .../beam/dsls/sql/BeamSqlDslUdfUdafTest.java    |  6 +--
 .../org/apache/beam/dsls/sql/TestUtils.java     | 28 +++++++-------
 ...mSqlBuiltinFunctionsIntegrationTestBase.java |  6 +--
 .../interpreter/BeamSqlFnExecutorTestBase.java  |  8 ++--
 .../beam/dsls/sql/mock/MockedBoundedTable.java  | 14 +++----
 .../apache/beam/dsls/sql/mock/MockedTable.java  |  6 +--
 .../dsls/sql/mock/MockedUnboundedTable.java     | 14 +++----
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |  6 +--
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |  7 ++--
 .../sql/schema/text/BeamTextCSVTableTest.java   | 17 ++++-----
 .../transform/BeamAggregationTransformTest.java | 13 +++----
 .../schema/transform/BeamTransformBaseTest.java | 12 +++---
 80 files changed, 354 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
index e8c8c97..0e1ac98 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
@@ -73,7 +73,7 @@ public class BeamSqlEnv implements Serializable{
    *
    */
   public void registerTable(String tableName, BaseBeamTable table) {
-    schema.add(tableName, new BeamCalciteTable(table.getRecordType()));
+    schema.add(tableName, new BeamCalciteTable(table.getRowType()));
     planner.getSourceTables().put(tableName, table);
   }
 
@@ -85,13 +85,13 @@ public class BeamSqlEnv implements Serializable{
   }
 
   private static class BeamCalciteTable implements ScannableTable, Serializable {
-    private BeamSqlRecordType beamSqlRecordType;
-    public BeamCalciteTable(BeamSqlRecordType beamSqlRecordType) {
-      this.beamSqlRecordType = beamSqlRecordType;
+    private BeamSqlRowType beamSqlRowType;
+    public BeamCalciteTable(BeamSqlRowType beamSqlRowType) {
+      this.beamSqlRowType = beamSqlRowType;
     }
     @Override
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-      return CalciteUtils.toCalciteRecordType(this.beamSqlRecordType)
+      return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
           .apply(BeamQueryPlanner.TYPE_FACTORY);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 04fe451..91df2be 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -21,9 +21,9 @@ import java.sql.Types;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.dsls.sql.BeamSql;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -51,7 +51,7 @@ class BeamSqlExample {
     //define the input row format
     List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
     List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
-    BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
+    BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes);
     BeamSqlRow row = new BeamSqlRow(type);
     row.addField(0, 1);
     row.addField(1, "row");

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
index a314bf4..3732933 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
@@ -37,7 +37,7 @@ public interface BeamSqlExpressionExecutor extends Serializable {
    * apply transformation to input record {@link BeamSqlRow}.
    *
    */
-  List<Object> execute(BeamSqlRow inputRecord);
+  List<Object> execute(BeamSqlRow inputRow);
 
   void close();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
index 64bc880..0be918d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -427,10 +427,10 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
   }
 
   @Override
-  public List<Object> execute(BeamSqlRow inputRecord) {
+  public List<Object> execute(BeamSqlRow inputRow) {
     List<Object> results = new ArrayList<>();
     for (BeamSqlExpression exp : exps) {
-      results.add(exp.evaluate(inputRecord).getValue());
+      results.add(exp.evaluate(inputRow).getValue());
     }
     return results;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
index a15c42e..a30916b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
@@ -49,16 +49,16 @@ public class BeamSqlCaseExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     for (int i = 0; i < operands.size() - 1; i += 2) {
-      if (opValueEvaluated(i, inputRecord)) {
+      if (opValueEvaluated(i, inputRow)) {
         return BeamSqlPrimitive.of(
             outputType,
-            opValueEvaluated(i + 1, inputRecord)
+            opValueEvaluated(i + 1, inputRow)
         );
       }
     }
     return BeamSqlPrimitive.of(outputType,
-        opValueEvaluated(operands.size() - 1, inputRecord));
+        opValueEvaluated(operands.size() - 1, inputRow));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
index 7e8ab03..524d1df 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
@@ -72,40 +72,40 @@ public class BeamSqlCastExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     SqlTypeName castOutputType = getOutputType();
     switch (castOutputType) {
       case INTEGER:
         return BeamSqlPrimitive
-            .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
       case DOUBLE:
         return BeamSqlPrimitive
-            .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
       case SMALLINT:
         return BeamSqlPrimitive
-            .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
       case TINYINT:
         return BeamSqlPrimitive
-            .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
       case BIGINT:
         return BeamSqlPrimitive
-            .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
       case DECIMAL:
         return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
-            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRecord)));
+            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
       case FLOAT:
         return BeamSqlPrimitive
-            .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
       case CHAR:
       case VARCHAR:
         return BeamSqlPrimitive
-            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRecord).toString());
+            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
       case DATE:
         return BeamSqlPrimitive
-            .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRecord), outputDateFormat));
+            .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
       case TIMESTAMP:
         return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-            toTimeStamp(opValueEvaluated(index, inputRecord), outputTimestampFormat));
+            toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
     }
     throw new UnsupportedOperationException(
         String.format("Cast to type %s not supported", castOutputType));

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
index 3d96616..5076ccc 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
@@ -48,9 +48,9 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
-    Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
-    Object rightValue = operands.get(1).evaluate(inputRecord).getValue();
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    Object rightValue = operands.get(1).evaluate(inputRow).getValue();
     switch (operands.get(0).outputType) {
     case BIGINT:
     case DECIMAL:

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
index 33feb3e..9d2815c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -62,7 +62,7 @@ public abstract class BeamSqlExpression implements Serializable {
    * Apply input record {@link BeamSqlRow} to this expression,
    * the output value is wrapped with {@link BeamSqlPrimitive}.
    */
-  public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRecord);
+  public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
 
   public List<BeamSqlExpression> getOperands() {
     return operands;

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
index b6d2b0b..710460b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -37,7 +37,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    return BeamSqlPrimitive.of(outputType, inputRecord.getFieldValue(inputRef));
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
index e08e737..23d9c83 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
@@ -44,8 +44,8 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
-    Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
     return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
index d4e070d..4d3fd45 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
@@ -44,8 +44,8 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
-    Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
     return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index c5c80b9..51724bb 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -145,7 +145,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
index 783466c..efdb2df 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -42,13 +42,13 @@ public class BeamSqlReinterpretExpression extends BeamSqlExpression {
         && SqlTypeName.DATETIME_TYPES.contains(opType(0));
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     if (opType(0) == SqlTypeName.TIME) {
-      GregorianCalendar date = opValueEvaluated(0, inputRecord);
+      GregorianCalendar date = opValueEvaluated(0, inputRow);
       return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
 
     } else {
-      Date date = opValueEvaluated(0, inputRecord);
+      Date date = opValueEvaluated(0, inputRow);
       return BeamSqlPrimitive.of(outputType, date.getTime());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
index 6f18307..e389ef9 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
@@ -51,14 +51,14 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     if (method == null) {
       reConstructMethod();
     }
     try {
       List<Object> paras = new ArrayList<>();
       for (BeamSqlExpression e : getOperands()) {
-        paras.add(e.evaluate(inputRecord).getValue());
+        paras.add(e.evaluate(inputRow).getValue());
       }
 
       return BeamSqlPrimitive.of(getOutputType(),

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
index 8bc090f..ecc6939 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -34,9 +34,9 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        new Date(inputRecord.getWindowEnd().getMillis()));
+        new Date(inputRow.getWindowEnd().getMillis()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
index eb4c03b..71f0672 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
@@ -42,9 +42,9 @@ public class BeamSqlWindowExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        (Date) operands.get(0).evaluate(inputRecord).getValue());
+        (Date) operands.get(0).evaluate(inputRow).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
index 1e2c0a2..f3aba2e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -35,9 +35,9 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        new Date(inputRecord.getWindowStart().getMillis()));
+        new Date(inputRow.getWindowStart().getMillis()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index eac4c72..d62123c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -50,11 +50,11 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
     super(operands, outputType);
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
     BigDecimal left = BigDecimal.valueOf(
-        Double.valueOf(opValueEvaluated(0, inputRecord).toString()));
+        Double.valueOf(opValueEvaluated(0, inputRow).toString()));
     BigDecimal right = BigDecimal.valueOf(
-        Double.valueOf(opValueEvaluated(1, inputRecord).toString()));
+        Double.valueOf(opValueEvaluated(1, inputRow).toString()));
 
     BigDecimal result = calc(left, right);
     return getCorrectlyTypedResult(result);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
index 2f83140..c7df5ab 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -39,7 +39,7 @@ public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
     return getOperands().size() == 0;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(outputType, new Date());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
index c15123a..46e5a43 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -45,7 +45,7 @@ public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
     return opCount <= 1;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
     ret.setTime(new Date());
     return BeamSqlPrimitive.of(outputType, ret);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
index 0ea12f1..303846d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
@@ -43,7 +43,7 @@ public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression {
     return opCount <= 1;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(outputType, new Date());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
index 68f1aa9..59e3e9c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -42,8 +42,8 @@ public class BeamSqlDateCeilExpression extends BeamSqlExpression {
         && opType(1) == SqlTypeName.SYMBOL;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    Date date = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Date date = opValueEvaluated(0, inputRow);
     long time = date.getTime();
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
index 4d446e3..64234f5 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -42,8 +42,8 @@ public class BeamSqlDateFloorExpression extends BeamSqlExpression {
         && opType(1) == SqlTypeName.SYMBOL;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    Date date = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Date date = opValueEvaluated(0, inputRow);
     long time = date.getTime();
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
index bc8ed0f..d41a249 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -61,8 +61,8 @@ public class BeamSqlExtractExpression extends BeamSqlExpression {
         && opType(1) == SqlTypeName.BIGINT;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    Long time = opValueEvaluated(1, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Long time = opValueEvaluated(1, inputRow);
 
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
index 5da43f4..5f6abe0 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -33,10 +33,10 @@ public class BeamSqlAndExpression extends BeamSqlLogicalExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
     boolean result = true;
     for (BeamSqlExpression exp : operands) {
-      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
       result = result && expOut.getValue();
       if (!result) {
         break;

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
index ffa0184..6df52aa 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -43,8 +43,8 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
     return super.accept();
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    Boolean value = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Boolean value = opValueEvaluated(0, inputRow);
     if (value == null) {
       return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);
     } else {

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
index 9ca57f0..450638c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -33,10 +33,10 @@ public class BeamSqlOrExpression extends BeamSqlLogicalExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
     boolean result = false;
     for (BeamSqlExpression exp : operands) {
-      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
         result = result || expOut.getValue();
         if (result) {
           break;

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
index f79bcf6..2d444f8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -39,10 +39,10 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression {
     return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1));
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
     BeamSqlExpression leftOp = op(0);
     BeamSqlExpression rightOp = op(1);
-    return calculate(leftOp.evaluate(inputRecord), rightOp.evaluate(inputRecord));
+    return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
index a65333c..4733d09 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -45,9 +45,9 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression {
     return acceptance;
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
     BeamSqlExpression operand = op(0);
-    return calculate(operand.evaluate(inputRecord));
+    return calculate(operand.evaluate(inputRow));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
index 4645951..9db810e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
@@ -36,7 +36,7 @@ public class BeamSqlPiExpression extends BeamSqlExpression {
     return numberOfOperands() == 0;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
index 3ed9b80..7c61061 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.INTEGER);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
     return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
index e8e4e50..93e1f71 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -52,9 +52,9 @@ public class BeamSqlConcatExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String left = opValueEvaluated(0, inputRecord);
-    String right = opValueEvaluated(1, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String left = opValueEvaluated(0, inputRow);
+    String right = opValueEvaluated(1, inputRow);
 
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
         new StringBuilder(left.length() + right.length())

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
index 51dfe28..7726e27 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
 
     StringBuilder ret = new StringBuilder(str);
     boolean isInit = true;

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
index f70fb1a..cb198ec 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
index 20d9962..cb6a523 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -55,15 +55,15 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
-    String replaceStr = opValueEvaluated(1, inputRecord);
-    int idx = opValueEvaluated(2, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
+    String replaceStr = opValueEvaluated(1, inputRow);
+    int idx = opValueEvaluated(2, inputRow);
     // the index is 1 based.
     idx -= 1;
     int length = replaceStr.length();
     if (operands.size() == 4) {
-      length = opValueEvaluated(3, inputRecord);
+      length = opValueEvaluated(3, inputRow);
     }
 
     StringBuilder result = new StringBuilder(

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
index 1d09b51..144acbf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -57,12 +57,12 @@ public class BeamSqlPositionExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String targetStr = opValueEvaluated(0, inputRecord);
-    String containingStr = opValueEvaluated(1, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String targetStr = opValueEvaluated(0, inputRow);
+    String containingStr = opValueEvaluated(1, inputRow);
     int from = -1;
     if (operands.size() == 3) {
-      Number tmp = opValueEvaluated(2, inputRecord);
+      Number tmp = opValueEvaluated(2, inputRow);
       from = tmp.intValue();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
index d9bbc98..8b33125 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -55,9 +55,9 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
-    int idx = opValueEvaluated(1, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
+    int idx = opValueEvaluated(1, inputRow);
     int startIdx = idx;
     if (startIdx > 0) {
       // NOTE: SQL substring is 1 based(rather than 0 based)
@@ -70,7 +70,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
     }
 
     if (operands.size() == 3) {
-      int length = opValueEvaluated(2, inputRecord);
+      int length = opValueEvaluated(2, inputRow);
       if (length < 0) {
         length = 0;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
index ac4d060..5e6c2bb 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -58,14 +58,14 @@ public class BeamSqlTrimExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     if (operands.size() == 1) {
       return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
-          opValueEvaluated(0, inputRecord).toString().trim());
+          opValueEvaluated(0, inputRow).toString().trim());
     } else {
-      SqlTrimFunction.Flag type = opValueEvaluated(0, inputRecord);
-      String targetStr = opValueEvaluated(1, inputRecord);
-      String containingStr = opValueEvaluated(2, inputRecord);
+      SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow);
+      String targetStr = opValueEvaluated(1, inputRow);
+      String containingStr = opValueEvaluated(2, inputRow);
 
       switch (type) {
         case LEADING:

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
index 8fcaca4..efa9c95 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 5389ec7..9dcb079 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -20,9 +20,9 @@ package org.apache.beam.dsls.sql.rel;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -105,13 +105,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
         stageName + "combineBy",
         Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
             new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
-                CalciteUtils.toBeamRecordType(input.getRowType()))))
+                CalciteUtils.toBeamRowType(input.getRowType()))))
         .setCoder(KvCoder.of(keyCoder, aggCoder));
 
     PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
-            CalciteUtils.toBeamRecordType(getRowType()), getAggCallList(), windowFieldIdx)));
-    mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+            CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx)));
+    mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
 
     return mergedStream;
   }
@@ -119,23 +119,23 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
   /**
    * Type of sub-rowrecord used as Group-By keys.
    */
-  private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
-    BeamSqlRecordType inputRecordType = CalciteUtils.toBeamRecordType(relDataType);
+  private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) {
+    BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType);
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (int i : groupSet.asList()) {
       if (i != windowFieldIdx) {
-        fieldNames.add(inputRecordType.getFieldsName().get(i));
-        fieldTypes.add(inputRecordType.getFieldsType().get(i));
+        fieldNames.add(inputRowType.getFieldsName().get(i));
+        fieldTypes.add(inputRowType.getFieldsType().get(i));
       }
     }
-    return BeamSqlRecordType.create(fieldNames, fieldTypes);
+    return BeamSqlRowType.create(fieldNames, fieldTypes);
   }
 
   /**
    * Type of sub-rowrecord, that represents the list of aggregation fields.
    */
-  private BeamSqlRecordType exAggFieldsSchema() {
+  private BeamSqlRowType exAggFieldsSchema() {
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (AggregateCall ac : getAggCallList()) {
@@ -143,7 +143,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
       fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
     }
 
-    return BeamSqlRecordType.create(fieldNames, fieldTypes);
+    return BeamSqlRowType.create(fieldNames, fieldTypes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 07b5c7c..f802104 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -62,7 +62,7 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
 
     PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
         ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
-    filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+    filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
 
     return filterStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index b26d2b8..6754991 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -56,7 +56,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
       //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
       BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
       return sourceTable.buildIOReader(inputPCollections.getPipeline())
-          .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+          .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
index 3c92e42..3ebf152 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
@@ -23,9 +23,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.transform.BeamJoinTransforms;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.Coder;
@@ -97,7 +97,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
       BeamSqlEnv sqlEnv)
       throws Exception {
     BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-    BeamSqlRecordType leftRowType = CalciteUtils.toBeamRecordType(left.getRowType());
+    BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
     PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
 
     final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
@@ -119,7 +119,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
       names.add("c" + i);
       types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
     }
-    BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types);
+    BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types);
 
     Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
 
@@ -213,7 +213,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
     PCollection<BeamSqlRow> ret = joinedRows
         .apply(stageName + "_JoinParts2WholeRow",
             MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
-        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
     return ret;
   }
 
@@ -249,13 +249,13 @@ public class BeamJoinRel extends Join implements BeamRelNode {
     PCollection<BeamSqlRow> ret = leftRows
         .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
             joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
-        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
 
     return ret;
   }
 
   private BeamSqlRow buildNullRow(BeamRelNode relNode) {
-    BeamSqlRecordType leftType = CalciteUtils.toBeamRecordType(relNode.getRowType());
+    BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
     BeamSqlRow nullRow = new BeamSqlRow(leftType);
     for (int i = 0; i < leftType.size(); i++) {
       nullRow.addField(i, null);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index 2cdfc72..8f8e5ce 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -72,8 +72,8 @@ public class BeamProjectRel extends Project implements BeamRelNode {
 
     PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
         .of(new BeamSqlProjectFn(getRelTypeName(), executor,
-            CalciteUtils.toBeamRecordType(rowType))));
-    projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+            CalciteUtils.toBeamRowType(rowType))));
+    projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
 
     return projectStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
index 75f9717..ba344df 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -149,7 +149,7 @@ public class BeamSortRel extends Sort implements BeamRelNode {
 
     PCollection<BeamSqlRow> orderedStream = rawStream.apply(
         "flatten", Flatten.<BeamSqlRow>iterables());
-    orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+    orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
 
     return orderedStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
index 030d2c8..43b74c3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -19,13 +19,12 @@
 package org.apache.beam.dsls.sql.rel;
 
 import com.google.common.collect.ImmutableList;
-
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.schema.BeamTableUtils;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.Create;
@@ -65,9 +64,9 @@ public class BeamValuesRel extends Values implements BeamRelNode {
       throw new IllegalStateException("Values with empty tuples!");
     }
 
-    BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(this.getRowType());
+    BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
     for (ImmutableList<RexLiteral> tuple : tuples) {
-      BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
+      BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
       for (int i = 0; i < tuple.size(); i++) {
         BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
       }
@@ -75,6 +74,6 @@ public class BeamValuesRel extends Values implements BeamRelNode {
     }
 
     return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
-        .setCoder(new BeamSqlRowCoder(beamSQLRecordType));
+        .setCoder(new BeamSqlRowCoder(beamSQLRowType));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
index 6d49bcc..dfa2785 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
@@ -23,12 +23,12 @@ import java.io.Serializable;
  * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
  */
 public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
-  protected BeamSqlRecordType beamSqlRecordType;
-  public BaseBeamTable(BeamSqlRecordType beamSqlRecordType) {
-    this.beamSqlRecordType = beamSqlRecordType;
+  protected BeamSqlRowType beamSqlRowType;
+  public BaseBeamTable(BeamSqlRowType beamSqlRowType) {
+    this.beamSqlRowType = beamSqlRowType;
   }
 
-  @Override public BeamSqlRecordType getRecordType() {
-    return beamSqlRecordType;
+  @Override public BeamSqlRowType getRowType() {
+    return beamSqlRowType;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
index 8309097..5b63780 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
@@ -31,13 +31,13 @@ public class BeamPCollectionTable extends BaseBeamTable {
   private BeamIOType ioType;
   private transient PCollection<BeamSqlRow> upstream;
 
-  protected BeamPCollectionTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
+  protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
   }
 
   public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
-      BeamSqlRecordType beamSqlRecordType){
-    this(beamSqlRecordType);
+      BeamSqlRowType beamSqlRowType){
+    this(beamSqlRowType);
     ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
         ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
     this.upstream = upstream;

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
deleted file mode 100644
index 52bd652..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Field type information in {@link BeamSqlRow}.
- *
- */
-@AutoValue
-public abstract class BeamSqlRecordType implements Serializable {
-  public abstract List<String> getFieldsName();
-  public abstract List<Integer> getFieldsType();
-
-  public static BeamSqlRecordType create(List<String> fieldNames, List<Integer> fieldTypes) {
-    return new org.apache.beam.dsls.sql.schema.AutoValue_BeamSqlRecordType(fieldNames, fieldTypes);
-  }
-
-  public int size() {
-    return getFieldsName().size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index 5c0dbc0..d789446 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -61,12 +61,12 @@ public class BeamSqlRow implements Serializable {
 
   private List<Integer> nullFields = new ArrayList<>();
   private List<Object> dataValues;
-  private BeamSqlRecordType dataType;
+  private BeamSqlRowType dataType;
 
   private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
   private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
 
-  public BeamSqlRow(BeamSqlRecordType dataType) {
+  public BeamSqlRow(BeamSqlRowType dataType) {
     this.dataType = dataType;
     this.dataValues = new ArrayList<>();
     for (int idx = 0; idx < dataType.size(); ++idx) {
@@ -75,7 +75,7 @@ public class BeamSqlRow implements Serializable {
     }
   }
 
-  public BeamSqlRow(BeamSqlRecordType dataType, List<Object> dataValues) {
+  public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
     this(dataType);
     for (int idx = 0; idx < dataValues.size(); ++idx) {
       addField(idx, dataValues.get(idx));
@@ -237,11 +237,11 @@ public class BeamSqlRow implements Serializable {
     this.dataValues = dataValues;
   }
 
-  public BeamSqlRecordType getDataType() {
+  public BeamSqlRowType getDataType() {
     return dataType;
   }
 
-  public void setDataType(BeamSqlRecordType dataType) {
+  public void setDataType(BeamSqlRowType dataType) {
     this.dataType = dataType;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index c798b35..f14864a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
  *  A {@link Coder} encodes {@link BeamSqlRow}.
  */
 public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
-  private BeamSqlRecordType tableSchema;
+  private BeamSqlRowType tableSchema;
 
   private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
 
@@ -52,7 +52,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
   private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
   private static final ByteCoder byteCoder = ByteCoder.of();
 
-  public BeamSqlRowCoder(BeamSqlRecordType tableSchema) {
+  public BeamSqlRowCoder(BeamSqlRowType tableSchema) {
     this.tableSchema = tableSchema;
   }
 
@@ -174,7 +174,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
     return record;
   }
 
-  public BeamSqlRecordType getTableSchema() {
+  public BeamSqlRowType getTableSchema() {
     return tableSchema;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
new file mode 100644
index 0000000..1129bdd
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Field type information in {@link BeamSqlRow}.
+ *
+ */
+@AutoValue
+public abstract class BeamSqlRowType implements Serializable {
+  public abstract List<String> getFieldsName();
+  public abstract List<Integer> getFieldsType();
+
+  public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) {
+    return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes);
+  }
+
+  public int size() {
+    return getFieldsName().size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
index 986decb..d419473 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
@@ -48,5 +48,5 @@ public interface BeamSqlTable {
   /**
    * Get the schema info of the table.
    */
-   BeamSqlRecordType getRecordType();
+   BeamSqlRowType getRowType();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
index 7157793..4b7e76b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
@@ -37,19 +37,19 @@ public final class BeamTableUtils {
   public static BeamSqlRow csvLine2BeamSqlRow(
       CSVFormat csvFormat,
       String line,
-      BeamSqlRecordType beamSqlRecordType) {
-    BeamSqlRow row = new BeamSqlRow(beamSqlRecordType);
+      BeamSqlRowType beamSqlRowType) {
+    BeamSqlRow row = new BeamSqlRow(beamSqlRowType);
     try (StringReader reader = new StringReader(line)) {
       CSVParser parser = csvFormat.parse(reader);
       CSVRecord rawRecord = parser.getRecords().get(0);
 
-      if (rawRecord.size() != beamSqlRecordType.size()) {
+      if (rawRecord.size() != beamSqlRowType.size()) {
         throw new IllegalArgumentException(String.format(
             "Expect %d fields, but actually %d",
-            beamSqlRecordType.size(), rawRecord.size()
+            beamSqlRowType.size(), rawRecord.size()
         ));
       } else {
-        for (int idx = 0; idx < beamSqlRecordType.size(); idx++) {
+        for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
           String raw = rawRecord.get(idx);
           addFieldWithAutoTypeCasting(row, idx, raw);
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
index 39cf8d8..a18f3de 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -21,9 +21,8 @@ import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
 import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
 
 import java.util.List;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -37,27 +36,27 @@ import org.apache.commons.csv.CSVFormat;
  */
 public class BeamKafkaCSVTable extends BeamKafkaTable {
   private CSVFormat csvFormat;
-  public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers,
+  public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
       List<String> topics) {
-    this(beamSqlRecordType, bootstrapServers, topics, CSVFormat.DEFAULT);
+    this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
   }
 
-  public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers,
+  public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
       List<String> topics, CSVFormat format) {
-    super(beamSqlRecordType, bootstrapServers, topics);
+    super(beamSqlRowType, bootstrapServers, topics);
     this.csvFormat = format;
   }
 
   @Override
   public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
       getPTransformForInput() {
-    return new CsvRecorderDecoder(beamSqlRecordType, csvFormat);
+    return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
   }
 
   @Override
   public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
       getPTransformForOutput() {
-    return new CsvRecorderEncoder(beamSqlRecordType, csvFormat);
+    return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
   }
 
   /**
@@ -66,10 +65,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
    */
   public static class CsvRecorderDecoder
       extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> {
-    private BeamSqlRecordType recordType;
+    private BeamSqlRowType rowType;
     private CSVFormat format;
-    public CsvRecorderDecoder(BeamSqlRecordType recordType, CSVFormat format) {
-      this.recordType = recordType;
+    public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) {
+      this.rowType = rowType;
       this.format = format;
     }
 
@@ -79,7 +78,7 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
         @ProcessElement
         public void processElement(ProcessContext c) {
           String rowInString = new String(c.element().getValue());
-          c.output(csvLine2BeamSqlRow(format, rowInString, recordType));
+          c.output(csvLine2BeamSqlRow(format, rowInString, rowType));
         }
       }));
     }
@@ -91,10 +90,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
    */
   public static class CsvRecorderEncoder
       extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> {
-    private BeamSqlRecordType recordType;
+    private BeamSqlRowType rowType;
     private CSVFormat format;
-    public CsvRecorderEncoder(BeamSqlRecordType recordType, CSVFormat format) {
-      this.recordType = recordType;
+    public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) {
+      this.rowType = rowType;
       this.format = format;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
index f27014e..faa2706 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -22,11 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -49,13 +48,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
   private List<String> topics;
   private Map<String, Object> configUpdates;
 
-  protected BeamKafkaTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
+  protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
   }
 
-  public BeamKafkaTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers,
+  public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
       List<String> topics) {
-    super(beamSqlRecordType);
+    super(beamSqlRowType);
     this.bootstrapServers = bootstrapServers;
     this.topics = topics;
   }


[3/3] beam git commit: This closes #3571

Posted by lz...@apache.org.
This closes #3571


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4d615a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4d615a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4d615a7

Branch: refs/heads/DSL_SQL
Commit: d4d615a7237729f5e59085ae2bd77541c6ea7ddb
Parents: a1f7cf6 a9c8a8a
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Jul 19 10:06:04 2017 +0800
Committer: JingsongLi <lz...@aliyun.com>
Committed: Wed Jul 19 10:06:04 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java    | 12 +++---
 .../beam/dsls/sql/example/BeamSqlExample.java   |  4 +-
 .../interpreter/BeamSqlExpressionExecutor.java  |  2 +-
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java |  4 +-
 .../operator/BeamSqlCaseExpression.java         |  8 ++--
 .../operator/BeamSqlCastExpression.java         | 22 +++++------
 .../operator/BeamSqlCompareExpression.java      |  6 +--
 .../interpreter/operator/BeamSqlExpression.java |  2 +-
 .../operator/BeamSqlInputRefExpression.java     |  4 +-
 .../operator/BeamSqlIsNotNullExpression.java    |  4 +-
 .../operator/BeamSqlIsNullExpression.java       |  4 +-
 .../interpreter/operator/BeamSqlPrimitive.java  |  2 +-
 .../operator/BeamSqlReinterpretExpression.java  |  6 +--
 .../operator/BeamSqlUdfExpression.java          |  4 +-
 .../operator/BeamSqlWindowEndExpression.java    |  4 +-
 .../operator/BeamSqlWindowExpression.java       |  4 +-
 .../operator/BeamSqlWindowStartExpression.java  |  4 +-
 .../arithmetic/BeamSqlArithmeticExpression.java |  6 +--
 .../date/BeamSqlCurrentDateExpression.java      |  2 +-
 .../date/BeamSqlCurrentTimeExpression.java      |  2 +-
 .../date/BeamSqlCurrentTimestampExpression.java |  2 +-
 .../date/BeamSqlDateCeilExpression.java         |  4 +-
 .../date/BeamSqlDateFloorExpression.java        |  4 +-
 .../operator/date/BeamSqlExtractExpression.java |  4 +-
 .../operator/logical/BeamSqlAndExpression.java  |  4 +-
 .../operator/logical/BeamSqlNotExpression.java  |  4 +-
 .../operator/logical/BeamSqlOrExpression.java   |  4 +-
 .../math/BeamSqlMathBinaryExpression.java       |  4 +-
 .../math/BeamSqlMathUnaryExpression.java        |  4 +-
 .../operator/math/BeamSqlPiExpression.java      |  2 +-
 .../string/BeamSqlCharLengthExpression.java     |  4 +-
 .../string/BeamSqlConcatExpression.java         |  6 +--
 .../string/BeamSqlInitCapExpression.java        |  4 +-
 .../operator/string/BeamSqlLowerExpression.java |  4 +-
 .../string/BeamSqlOverlayExpression.java        | 10 ++---
 .../string/BeamSqlPositionExpression.java       |  8 ++--
 .../string/BeamSqlSubstringExpression.java      |  8 ++--
 .../operator/string/BeamSqlTrimExpression.java  | 10 ++---
 .../operator/string/BeamSqlUpperExpression.java |  4 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   | 22 +++++------
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |  2 +-
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |  2 +-
 .../apache/beam/dsls/sql/rel/BeamJoinRel.java   | 12 +++---
 .../beam/dsls/sql/rel/BeamProjectRel.java       |  4 +-
 .../apache/beam/dsls/sql/rel/BeamSortRel.java   |  2 +-
 .../apache/beam/dsls/sql/rel/BeamValuesRel.java |  9 ++---
 .../beam/dsls/sql/schema/BaseBeamTable.java     | 10 ++---
 .../dsls/sql/schema/BeamPCollectionTable.java   |  8 ++--
 .../beam/dsls/sql/schema/BeamSqlRecordType.java | 40 --------------------
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 10 ++---
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  6 +--
 .../beam/dsls/sql/schema/BeamSqlRowType.java    | 40 ++++++++++++++++++++
 .../beam/dsls/sql/schema/BeamSqlTable.java      |  2 +-
 .../beam/dsls/sql/schema/BeamTableUtils.java    | 10 ++---
 .../sql/schema/kafka/BeamKafkaCSVTable.java     | 29 +++++++-------
 .../dsls/sql/schema/kafka/BeamKafkaTable.java   | 11 +++---
 .../dsls/sql/schema/text/BeamTextCSVTable.java  | 14 +++----
 .../schema/text/BeamTextCSVTableIOReader.java   | 11 +++---
 .../schema/text/BeamTextCSVTableIOWriter.java   |  9 ++---
 .../dsls/sql/schema/text/BeamTextTable.java     |  6 +--
 .../transform/BeamAggregationTransforms.java    | 26 ++++++-------
 .../dsls/sql/transform/BeamJoinTransforms.java  |  6 +--
 .../dsls/sql/transform/BeamSqlProjectFn.java    | 16 ++++----
 .../beam/dsls/sql/utils/CalciteUtils.java       | 12 +++---
 .../dsls/sql/BeamSqlDslAggregationTest.java     | 14 +++----
 .../apache/beam/dsls/sql/BeamSqlDslBase.java    | 26 ++++++-------
 .../beam/dsls/sql/BeamSqlDslJoinTest.java       | 10 ++---
 .../beam/dsls/sql/BeamSqlDslProjectTest.java    | 10 ++---
 .../beam/dsls/sql/BeamSqlDslUdfUdafTest.java    |  6 +--
 .../org/apache/beam/dsls/sql/TestUtils.java     | 28 +++++++-------
 ...mSqlBuiltinFunctionsIntegrationTestBase.java |  6 +--
 .../interpreter/BeamSqlFnExecutorTestBase.java  |  8 ++--
 .../beam/dsls/sql/mock/MockedBoundedTable.java  | 14 +++----
 .../apache/beam/dsls/sql/mock/MockedTable.java  |  6 +--
 .../dsls/sql/mock/MockedUnboundedTable.java     | 14 +++----
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |  6 +--
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |  7 ++--
 .../sql/schema/text/BeamTextCSVTableTest.java   | 17 ++++-----
 .../transform/BeamAggregationTransformTest.java | 13 +++----
 .../schema/transform/BeamTransformBaseTest.java | 12 +++---
 80 files changed, 354 insertions(+), 362 deletions(-)
----------------------------------------------------------------------