You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/04 17:09:48 UTC

[5/7] beam git commit: refactor BeamRecord, BeamRecordType, BeamSqlRecordType, BeamRecordCoder

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
index ea5f749..06dce91 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -56,7 +56,7 @@ public class BeamSqlPositionExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     String targetStr = opValueEvaluated(0, inputRow);
     String containingStr = opValueEvaluated(1, inputRow);
     int from = -1;

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
index 25f205a..f8582aa 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -54,7 +54,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     String str = opValueEvaluated(0, inputRow);
     int idx = opValueEvaluated(1, inputRow);
     int startIdx = idx;

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
index 9493e24..9c2a7ae 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.fun.SqlTrimFunction;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -58,7 +58,7 @@ public class BeamSqlTrimExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     if (operands.size() == 1) {
       return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
           opValueEvaluated(0, inputRow).toString().trim());

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
index 9769c0e..94ac2e2 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -32,7 +32,7 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     String str = opValueEvaluated(0, inputRow);
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
index dd01a87..b421bc3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
@@ -107,7 +107,7 @@ public class BeamQueryPlanner {
    * which is linked with the given {@code pipeline}. The final output stream is returned as
    * {@code PCollection} so more operations can be applied.
    */
-  public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
+  public PCollection<BeamRecord> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
       , BeamSqlEnv sqlEnv) throws Exception {
     BeamRelNode relNode = convertToBeamRel(sqlStatement);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 8e78684..d91b484 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -19,13 +19,12 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -34,6 +33,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -55,7 +55,7 @@ import org.joda.time.Duration;
  */
 public class BeamAggregationRel extends Aggregate implements BeamRelNode {
   private int windowFieldIdx = -1;
-  private WindowFn<BeamSqlRow, BoundedWindow> windowFn;
+  private WindowFn<BeamRecord, BoundedWindow> windowFn;
   private Trigger trigger;
   private Duration allowedLatence = Duration.ZERO;
 
@@ -71,12 +71,12 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
     String stageName = BeamSqlRelUtils.getStageName(this) + "_";
 
-    PCollection<BeamSqlRow> upstream =
+    PCollection<BeamRecord> upstream =
         BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
     if (windowFieldIdx != -1) {
       upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps
@@ -84,14 +84,14 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
           .setCoder(upstream.getCoder());
     }
 
-    PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window",
+    PCollection<BeamRecord> windowStream = upstream.apply(stageName + "window",
         Window.into(windowFn)
         .triggering(trigger)
         .withAllowedLateness(allowedLatence)
         .accumulatingFiredPanes());
 
-    BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply(
+    BeamRecordCoder keyCoder = exKeyFieldsSchema(input.getRowType()).getRecordCoder();
+    PCollection<KV<BeamRecord, BeamRecord>> exCombineByStream = windowStream.apply(
         stageName + "exCombineBy",
         WithKeys
             .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
@@ -99,19 +99,19 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
         .setCoder(KvCoder.of(keyCoder, upstream.getCoder()));
 
 
-    BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
+    BeamRecordCoder aggCoder = exAggFieldsSchema().getRecordCoder();
 
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply(
+    PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = exCombineByStream.apply(
         stageName + "combineBy",
-        Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
+        Combine.<BeamRecord, BeamRecord, BeamRecord>perKey(
             new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
                 CalciteUtils.toBeamRowType(input.getRowType()))))
         .setCoder(KvCoder.of(keyCoder, aggCoder));
 
-    PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
+    PCollection<BeamRecord> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
             CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx)));
-    mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+    mergedStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
 
     return mergedStream;
   }
@@ -119,8 +119,8 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
   /**
    * Type of sub-rowrecord used as Group-By keys.
    */
-  private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) {
-    BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType);
+  private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
+    BeamSqlRecordType inputRowType = CalciteUtils.toBeamRowType(relDataType);
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (int i : groupSet.asList()) {
@@ -129,13 +129,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
         fieldTypes.add(inputRowType.getFieldsType().get(i));
       }
     }
-    return BeamSqlRowType.create(fieldNames, fieldTypes);
+    return BeamSqlRecordType.create(fieldNames, fieldTypes);
   }
 
   /**
    * Type of sub-rowrecord, that represents the list of aggregation fields.
    */
-  private BeamSqlRowType exAggFieldsSchema() {
+  private BeamSqlRecordType exAggFieldsSchema() {
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (AggregateCall ac : getAggCallList()) {
@@ -143,7 +143,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
       fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
     }
 
-    return BeamSqlRowType.create(fieldNames, fieldTypes);
+    return BeamSqlRecordType.create(fieldNames, fieldTypes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
index b453db4..8fe5be4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
@@ -22,9 +22,8 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExec
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlFilterFn;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
@@ -50,19 +49,19 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
     String stageName = BeamSqlRelUtils.getStageName(this);
 
-    PCollection<BeamSqlRow> upstream =
+    PCollection<BeamRecord> upstream =
         BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
 
     BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
 
-    PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
+    PCollection<BeamRecord> filterStream = upstream.apply(stageName,
         ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
-    filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+    filterStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
 
     return filterStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index d5eb210..1e3eb4c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -21,7 +21,7 @@ import com.google.common.base.Joiner;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
@@ -55,12 +55,12 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
    * which is the persisted PCollection.
    */
   @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
     String stageName = BeamSqlRelUtils.getStageName(this);
 
-    PCollection<BeamSqlRow> upstream =
+    PCollection<BeamRecord> upstream =
         BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
 
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 5179eba..254f990 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -21,8 +21,7 @@ import com.google.common.base.Joiner;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
@@ -42,21 +41,21 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 
-    TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName);
+    TupleTag<BeamRecord> sourceTupleTag = new TupleTag<>(sourceName);
     if (inputPCollections.has(sourceTupleTag)) {
       //choose PCollection from input PCollectionTuple if exists there.
-      PCollection<BeamSqlRow> sourceStream = inputPCollections
-          .get(new TupleTag<BeamSqlRow>(sourceName));
+      PCollection<BeamRecord> sourceStream = inputPCollections
+          .get(new TupleTag<BeamRecord>(sourceName));
       return sourceStream;
     } else {
       //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
       BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
       return sourceTable.buildIOReader(inputPCollections.getPipeline())
-          .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+          .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
index d6ab52d..5919329 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
@@ -51,7 +51,7 @@ public class BeamIntersectRel extends Intersect implements BeamRelNode {
     return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
     return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 2de2a89..9e5ce2f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -27,14 +27,13 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -93,15 +92,15 @@ public class BeamJoinRel extends Join implements BeamRelNode {
         joinType);
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections,
+  @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections,
       BeamSqlEnv sqlEnv)
       throws Exception {
     BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-    BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
-    PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+    BeamSqlRecordType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
+    PCollection<BeamRecord> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
 
     final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
-    PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+    PCollection<BeamRecord> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
 
     String stageName = BeamSqlRelUtils.getStageName(this);
     WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
@@ -119,24 +118,24 @@ public class BeamJoinRel extends Join implements BeamRelNode {
       names.add("c" + i);
       types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
     }
-    BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types);
+    BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types);
 
-    Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
+    Coder extractKeyRowCoder = extractKeyRowType.getRecordCoder();
 
     // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows
+    PCollection<KV<BeamRecord, BeamRecord>> extractedLeftRows = leftRows
         .apply(stageName + "_left_ExtractJoinFields",
             MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs)))
         .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
 
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows
+    PCollection<KV<BeamRecord, BeamRecord>> extractedRightRows = rightRows
         .apply(stageName + "_right_ExtractJoinFields",
             MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs)))
         .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
 
     // prepare the NullRows
-    BeamSqlRow leftNullRow = buildNullRow(leftRelNode);
-    BeamSqlRow rightNullRow = buildNullRow(rightRelNode);
+    BeamRecord leftNullRow = buildNullRow(leftRelNode);
+    BeamRecord rightNullRow = buildNullRow(rightRelNode);
 
     // a regular join
     if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
@@ -184,11 +183,11 @@ public class BeamJoinRel extends Join implements BeamRelNode {
     }
   }
 
-  private PCollection<BeamSqlRow> standardJoin(
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
-      BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) {
-    PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null;
+  private PCollection<BeamRecord> standardJoin(
+      PCollection<KV<BeamRecord, BeamRecord>> extractedLeftRows,
+      PCollection<KV<BeamRecord, BeamRecord>> extractedRightRows,
+      BeamRecord leftNullRow, BeamRecord rightNullRow, String stageName) {
+    PCollection<KV<BeamRecord, KV<BeamRecord, BeamRecord>>> joinedRows = null;
     switch (joinType) {
       case LEFT:
         joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
@@ -210,53 +209,53 @@ public class BeamJoinRel extends Join implements BeamRelNode {
         break;
     }
 
-    PCollection<BeamSqlRow> ret = joinedRows
+    PCollection<BeamRecord> ret = joinedRows
         .apply(stageName + "_JoinParts2WholeRow",
             MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
-        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+        .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
     return ret;
   }
 
-  public PCollection<BeamSqlRow> sideInputJoin(
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
-      BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) {
+  public PCollection<BeamRecord> sideInputJoin(
+      PCollection<KV<BeamRecord, BeamRecord>> extractedLeftRows,
+      PCollection<KV<BeamRecord, BeamRecord>> extractedRightRows,
+      BeamRecord leftNullRow, BeamRecord rightNullRow) {
     // we always make the Unbounded table on the left to do the sideInput join
     // (will convert the result accordingly before return)
     boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED);
     JoinRelType realJoinType =
         (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType;
 
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows =
+    PCollection<KV<BeamRecord, BeamRecord>> realLeftRows =
         swapped ? extractedRightRows : extractedLeftRows;
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows =
+    PCollection<KV<BeamRecord, BeamRecord>> realRightRows =
         swapped ? extractedLeftRows : extractedRightRows;
-    BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow;
+    BeamRecord realRightNullRow = swapped ? leftNullRow : rightNullRow;
 
     // swapped still need to pass down because, we need to swap the result back.
     return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
         realRightNullRow, swapped);
   }
 
-  private PCollection<BeamSqlRow> sideInputJoinHelper(
+  private PCollection<BeamRecord> sideInputJoinHelper(
       JoinRelType joinType,
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows,
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows,
-      BeamSqlRow rightNullRow, boolean swapped) {
-    final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows
-        .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap());
+      PCollection<KV<BeamRecord, BeamRecord>> leftRows,
+      PCollection<KV<BeamRecord, BeamRecord>> rightRows,
+      BeamRecord rightNullRow, boolean swapped) {
+    final PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> rowsView = rightRows
+        .apply(View.<BeamRecord, BeamRecord>asMultimap());
 
-    PCollection<BeamSqlRow> ret = leftRows
+    PCollection<BeamRecord> ret = leftRows
         .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
             joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
-        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+        .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
 
     return ret;
   }
 
-  private BeamSqlRow buildNullRow(BeamRelNode relNode) {
-    BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
-    BeamSqlRow nullRow = new BeamSqlRow(leftType);
+  private BeamRecord buildNullRow(BeamRelNode relNode) {
+    BeamSqlRecordType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
+    BeamRecord nullRow = new BeamRecord(leftType);
     for (int i = 0; i < leftType.size(); i++) {
       nullRow.addField(i, null);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
index 0075d3a..b55252a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
@@ -49,7 +49,7 @@ public class BeamMinusRel extends Minus implements BeamRelNode {
     return new BeamMinusRel(getCluster(), traitSet, inputs, all);
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
     return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
index 6ccb156..b1ff629 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
@@ -23,9 +23,8 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExec
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlProjectFn;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
@@ -60,20 +59,20 @@ public class BeamProjectRel extends Project implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
     String stageName = BeamSqlRelUtils.getStageName(this);
 
-    PCollection<BeamSqlRow> upstream =
+    PCollection<BeamRecord> upstream =
         BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
 
     BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
 
-    PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
+    PCollection<BeamRecord> projectStream = upstream.apply(stageName, ParDo
         .of(new BeamSqlProjectFn(getRelTypeName(), executor,
             CalciteUtils.toBeamRowType(rowType))));
-    projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+    projectStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
 
     return projectStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index 8a51cc7..b8b4293 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.rel.RelNode;
@@ -33,6 +33,6 @@ public interface BeamRelNode extends RelNode {
    * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
    * algorithm.
    */
-  PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
+  PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
       throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
index 44e4338..f9cbf4f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -22,13 +22,13 @@ import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
 import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -62,11 +62,11 @@ public class BeamSetOperatorRelBase {
     this.all = all;
   }
 
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
-    PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
+    PCollection<BeamRecord> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
         .buildBeamPipeline(inputPCollections, sqlEnv);
-    PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
+    PCollection<BeamRecord> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
         .buildBeamPipeline(inputPCollections, sqlEnv);
 
     WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
@@ -77,20 +77,20 @@ public class BeamSetOperatorRelBase {
           + leftWindow + " VS " + rightWindow);
     }
 
-    final TupleTag<BeamSqlRow> leftTag = new TupleTag<>();
-    final TupleTag<BeamSqlRow> rightTag = new TupleTag<>();
+    final TupleTag<BeamRecord> leftTag = new TupleTag<>();
+    final TupleTag<BeamRecord> rightTag = new TupleTag<>();
 
     // co-group
     String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
-    PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
+    PCollection<KV<BeamRecord, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
         .of(leftTag, leftRows.apply(
             stageName + "_CreateLeftIndex", MapElements.via(
                 new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
         .and(rightTag, rightRows.apply(
             stageName + "_CreateRightIndex", MapElements.via(
                 new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
-        .apply(CoGroupByKey.<BeamSqlRow>create());
-    PCollection<BeamSqlRow> ret = coGbkResultCollection
+        .apply(CoGroupByKey.<BeamRecord>create());
+    PCollection<BeamRecord> ret = coGbkResultCollection
         .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag,
             opType, all)));
     return ret;

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 4ea12ca..0cbea5c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -27,13 +27,13 @@ import java.util.List;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
@@ -120,10 +120,10 @@ public class BeamSortRel extends Sort implements BeamRelNode {
     }
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
-    PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
+    PCollection<BeamRecord> upstream = BeamSqlRelUtils.getBeamRelInput(input)
         .buildBeamPipeline(inputPCollections, sqlEnv);
     Type windowType = upstream.getWindowingStrategy().getWindowFn()
         .getWindowTypeDescriptor().getType();
@@ -135,21 +135,21 @@ public class BeamSortRel extends Sort implements BeamRelNode {
     BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation,
         nullsFirst);
     // first find the top (offset + count)
-    PCollection<List<BeamSqlRow>> rawStream =
+    PCollection<List<BeamRecord>> rawStream =
         upstream.apply("extractTopOffsetAndFetch",
             Top.of(startIndex + count, comparator).withoutDefaults())
-        .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
+        .setCoder(ListCoder.<BeamRecord>of(upstream.getCoder()));
 
     // strip the `leading offset`
     if (startIndex > 0) {
       rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
-          new SubListFn<BeamSqlRow>(startIndex, startIndex + count)))
-          .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
+          new SubListFn<BeamRecord>(startIndex, startIndex + count)))
+          .setCoder(ListCoder.<BeamRecord>of(upstream.getCoder()));
     }
 
-    PCollection<BeamSqlRow> orderedStream = rawStream.apply(
-        "flatten", Flatten.<BeamSqlRow>iterables());
-    orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+    PCollection<BeamRecord> orderedStream = rawStream.apply(
+        "flatten", Flatten.<BeamRecord>iterables());
+    orderedStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
 
     return orderedStream;
   }
@@ -174,7 +174,7 @@ public class BeamSortRel extends Sort implements BeamRelNode {
     return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
   }
 
-  private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable {
+  private static class BeamSqlRowComparator implements Comparator<BeamRecord>, Serializable {
     private List<Integer> fieldsIndices;
     private List<Boolean> orientation;
     private List<Boolean> nullsFirst;
@@ -187,11 +187,12 @@ public class BeamSortRel extends Sort implements BeamRelNode {
       this.nullsFirst = nullsFirst;
     }
 
-    @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) {
+    @Override public int compare(BeamRecord row1, BeamRecord row2) {
       for (int i = 0; i < fieldsIndices.size(); i++) {
         int fieldIndex = fieldsIndices.get(i);
         int fieldRet = 0;
-        SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex);
+        SqlTypeName fieldType = CalciteUtils.getFieldType(
+            BeamSqlRecordHelper.getSqlRecordType(row1), fieldIndex);
         // whether NULL should be ordered first or last(compared to non-null values) depends on
         // what user specified in SQL(NULLS FIRST/NULLS LAST)
         if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
@@ -203,28 +204,16 @@ public class BeamSortRel extends Sort implements BeamRelNode {
         } else {
           switch (fieldType) {
             case TINYINT:
-              fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex));
-              break;
             case SMALLINT:
-              fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex));
-              break;
             case INTEGER:
-              fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex));
-              break;
             case BIGINT:
-              fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex));
-              break;
             case FLOAT:
-              fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex));
-              break;
             case DOUBLE:
-              fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex));
-              break;
             case VARCHAR:
-              fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex));
-              break;
             case DATE:
-              fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex));
+              Comparable v1 = (Comparable) row1.getFieldValue(fieldIndex);
+              Comparable v2 = (Comparable) row2.getFieldValue(fieldIndex);
+              fieldRet = v1.compareTo(v2);
               break;
             default:
               throw new UnsupportedOperationException(
@@ -241,7 +230,7 @@ public class BeamSortRel extends Sort implements BeamRelNode {
     }
   }
 
-  public static <T extends Number & Comparable> int numberCompare(T a, T b) {
+  public static <T extends Comparable> int compare(T a, T b) {
     return a.compareTo(b);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index d35fa67..63ebdf3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -20,8 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
@@ -81,7 +81,7 @@ public class BeamUnionRel extends Union implements BeamRelNode {
     return new BeamUnionRel(getCluster(), traitSet, inputs, all);
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
     return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index f12cbbc..8ad6e8d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -23,11 +23,10 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
@@ -56,17 +55,17 @@ public class BeamValuesRel extends Values implements BeamRelNode {
 
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+  @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
-    List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
+    List<BeamRecord> rows = new ArrayList<>(tuples.size());
     String stageName = BeamSqlRelUtils.getStageName(this);
     if (tuples.isEmpty()) {
       throw new IllegalStateException("Values with empty tuples!");
     }
 
-    BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
+    BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
     for (ImmutableList<RexLiteral> tuple : tuples) {
-      BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
+      BeamRecord row = new BeamRecord(beamSQLRowType);
       for (int i = 0; i < tuple.size(); i++) {
         BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
       }
@@ -74,6 +73,6 @@ public class BeamValuesRel extends Values implements BeamRelNode {
     }
 
     return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
-        .setCoder(new BeamSqlRowCoder(beamSQLRowType));
+        .setCoder(beamSQLRowType.getRecordCoder());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index 095875f..dab79a2 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -35,13 +35,14 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.schema.impl.AggregateFunctionImpl;
@@ -56,12 +57,12 @@ public class BeamAggregationTransforms implements Serializable{
   /**
    * Merge KV to single record.
    */
-  public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
-    private BeamSqlRowType outRowType;
+  public static class MergeAggregationRecord extends DoFn<KV<BeamRecord, BeamRecord>, BeamRecord> {
+    private BeamSqlRecordType outRowType;
     private List<String> aggFieldNames;
     private int windowStartFieldIdx;
 
-    public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList
+    public MergeAggregationRecord(BeamSqlRecordType outRowType, List<AggregateCall> aggList
         , int windowStartFieldIdx) {
       this.outRowType = outRowType;
       this.aggFieldNames = new ArrayList<>();
@@ -73,10 +74,10 @@ public class BeamAggregationTransforms implements Serializable{
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) {
-      BeamSqlRow outRecord = new BeamSqlRow(outRowType);
+      BeamRecord outRecord = new BeamRecord(outRowType);
       outRecord.updateWindowRange(c.element().getKey(), window);
 
-      KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element();
+      KV<BeamRecord, BeamRecord> kvRecord = c.element();
       for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
         outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
       }
@@ -95,7 +96,7 @@ public class BeamAggregationTransforms implements Serializable{
    * extract group-by fields.
    */
   public static class AggregationGroupByKeyFn
-      implements SerializableFunction<BeamSqlRow, BeamSqlRow> {
+      implements SerializableFunction<BeamRecord, BeamRecord> {
     private List<Integer> groupByKeys;
 
     public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) {
@@ -108,9 +109,9 @@ public class BeamAggregationTransforms implements Serializable{
     }
 
     @Override
-    public BeamSqlRow apply(BeamSqlRow input) {
-      BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType());
-      BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey);
+    public BeamRecord apply(BeamRecord input) {
+      BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input));
+      BeamRecord keyOfRecord = new BeamRecord(typeOfKey);
       keyOfRecord.updateWindowRange(input, null);
 
       for (int idx = 0; idx < groupByKeys.size(); ++idx) {
@@ -119,21 +120,21 @@ public class BeamAggregationTransforms implements Serializable{
       return keyOfRecord;
     }
 
-    private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) {
+    private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) {
       List<String> fieldNames = new ArrayList<>();
       List<Integer> fieldTypes = new ArrayList<>();
       for (int idx : groupByKeys) {
         fieldNames.add(dataType.getFieldsName().get(idx));
         fieldTypes.add(dataType.getFieldsType().get(idx));
       }
-      return BeamSqlRowType.create(fieldNames, fieldTypes);
+      return BeamSqlRecordType.create(fieldNames, fieldTypes);
     }
   }
 
   /**
    * Assign event timestamp.
    */
-  public static class WindowTimestampFn implements SerializableFunction<BeamSqlRow, Instant> {
+  public static class WindowTimestampFn implements SerializableFunction<BeamRecord, Instant> {
     private int windowFieldIdx = -1;
 
     public WindowTimestampFn(int windowFieldIdx) {
@@ -142,7 +143,7 @@ public class BeamAggregationTransforms implements Serializable{
     }
 
     @Override
-    public Instant apply(BeamSqlRow input) {
+    public Instant apply(BeamRecord input) {
       return new Instant(input.getDate(windowFieldIdx).getTime());
     }
   }
@@ -151,13 +152,13 @@ public class BeamAggregationTransforms implements Serializable{
    * An adaptor class to invoke Calcite UDAF instances in Beam {@code CombineFn}.
    */
   public static class AggregationAdaptor
-    extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> {
+    extends CombineFn<BeamRecord, AggregationAccumulator, BeamRecord> {
     private List<BeamSqlUdaf> aggregators;
     private List<BeamSqlExpression> sourceFieldExps;
-    private BeamSqlRowType finalRowType;
+    private BeamSqlRecordType finalRowType;
 
     public AggregationAdaptor(List<AggregateCall> aggregationCalls,
-        BeamSqlRowType sourceRowType) {
+        BeamSqlRecordType sourceRowType) {
       aggregators = new ArrayList<>();
       sourceFieldExps = new ArrayList<>();
       List<String> outFieldsName = new ArrayList<>();
@@ -206,7 +207,7 @@ public class BeamAggregationTransforms implements Serializable{
           break;
         }
       }
-      finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType);
+      finalRowType = BeamSqlRecordType.create(outFieldsName, outFieldsType);
     }
     @Override
     public AggregationAccumulator createAccumulator() {
@@ -217,7 +218,7 @@ public class BeamAggregationTransforms implements Serializable{
       return initialAccu;
     }
     @Override
-    public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamSqlRow input) {
+    public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamRecord input) {
       AggregationAccumulator deltaAcc = new AggregationAccumulator();
       for (int idx = 0; idx < aggregators.size(); ++idx) {
         deltaAcc.accumulatorElements.add(
@@ -240,8 +241,8 @@ public class BeamAggregationTransforms implements Serializable{
       return deltaAcc;
     }
     @Override
-    public BeamSqlRow extractOutput(AggregationAccumulator accumulator) {
-      BeamSqlRow result = new BeamSqlRow(finalRowType);
+    public BeamRecord extractOutput(AggregationAccumulator accumulator) {
+      BeamRecord result = new BeamRecord(finalRowType);
       for (int idx = 0; idx < aggregators.size(); ++idx) {
         result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
       }
@@ -249,7 +250,7 @@ public class BeamAggregationTransforms implements Serializable{
     }
     @Override
     public Coder<AggregationAccumulator> getAccumulatorCoder(
-        CoderRegistry registry, Coder<BeamSqlRow> inputCoder)
+        CoderRegistry registry, Coder<BeamRecord> inputCoder)
         throws CannotProvideCoderException {
       registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of());
       List<Coder> aggAccuCoderList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index e0898d1..105bbf3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -22,10 +22,11 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -40,7 +41,7 @@ public class BeamJoinTransforms {
    * A {@code SimpleFunction} to extract join fields from the specified row.
    */
   public static class ExtractJoinFields
-      extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
+      extends SimpleFunction<BeamRecord, KV<BeamRecord, BeamRecord>> {
     private final boolean isLeft;
     private final List<Pair<Integer, Integer>> joinColumns;
 
@@ -49,7 +50,7 @@ public class BeamJoinTransforms {
       this.joinColumns = joinColumns;
     }
 
-    @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+    @Override public KV<BeamRecord, BeamRecord> apply(BeamRecord input) {
       // build the type
       // the name of the join field is not important
       List<String> names = new ArrayList<>(joinColumns.size());
@@ -57,13 +58,15 @@ public class BeamJoinTransforms {
       for (int i = 0; i < joinColumns.size(); i++) {
         names.add("c" + i);
         types.add(isLeft
-            ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
-            input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
+            ? BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType()
+                .get(joinColumns.get(i).getKey())
+            : BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType()
+                .get(joinColumns.get(i).getValue()));
       }
-      BeamSqlRowType type = BeamSqlRowType.create(names, types);
+      BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
 
       // build the row
-      BeamSqlRow row = new BeamSqlRow(type);
+      BeamRecord row = new BeamRecord(type);
       for (int i = 0; i < joinColumns.size(); i++) {
         row.addField(i, input
             .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue()));
@@ -76,14 +79,14 @@ public class BeamJoinTransforms {
   /**
    * A {@code DoFn} which implement the sideInput-JOIN.
    */
-  public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
-    private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView;
+  public static class SideInputJoinDoFn extends DoFn<KV<BeamRecord, BeamRecord>, BeamRecord> {
+    private final PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> sideInputView;
     private final JoinRelType joinType;
-    private final BeamSqlRow rightNullRow;
+    private final BeamRecord rightNullRow;
     private final boolean swap;
 
-    public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow,
-        PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView,
+    public SideInputJoinDoFn(JoinRelType joinType, BeamRecord rightNullRow,
+        PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> sideInputView,
         boolean swap) {
       this.joinType = joinType;
       this.rightNullRow = rightNullRow;
@@ -92,13 +95,13 @@ public class BeamJoinTransforms {
     }
 
     @ProcessElement public void processElement(ProcessContext context) {
-      BeamSqlRow key = context.element().getKey();
-      BeamSqlRow leftRow = context.element().getValue();
-      Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView);
-      Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key);
+      BeamRecord key = context.element().getKey();
+      BeamRecord leftRow = context.element().getValue();
+      Map<BeamRecord, Iterable<BeamRecord>> key2Rows = context.sideInput(sideInputView);
+      Iterable<BeamRecord> rightRowsIterable = key2Rows.get(key);
 
       if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) {
-        Iterator<BeamSqlRow> it = rightRowsIterable.iterator();
+        Iterator<BeamRecord> it = rightRowsIterable.iterator();
         while (it.hasNext()) {
           context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap));
         }
@@ -115,11 +118,11 @@ public class BeamJoinTransforms {
    * A {@code SimpleFunction} to combine two rows into one.
    */
   public static class JoinParts2WholeRow
-      extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> {
-    @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) {
-      KV<BeamSqlRow, BeamSqlRow> parts = input.getValue();
-      BeamSqlRow leftRow = parts.getKey();
-      BeamSqlRow rightRow = parts.getValue();
+      extends SimpleFunction<KV<BeamRecord, KV<BeamRecord, BeamRecord>>, BeamRecord> {
+    @Override public BeamRecord apply(KV<BeamRecord, KV<BeamRecord, BeamRecord>> input) {
+      KV<BeamRecord, BeamRecord> parts = input.getValue();
+      BeamRecord leftRow = parts.getKey();
+      BeamRecord rightRow = parts.getValue();
       return combineTwoRowsIntoOne(leftRow, rightRow, false);
     }
   }
@@ -127,8 +130,8 @@ public class BeamJoinTransforms {
   /**
    * As the method name suggests: combine two rows into one wide row.
    */
-  private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow,
-      BeamSqlRow rightRow, boolean swap) {
+  private static BeamRecord combineTwoRowsIntoOne(BeamRecord leftRow,
+      BeamRecord rightRow, boolean swap) {
     if (swap) {
       return combineTwoRowsIntoOneHelper(rightRow, leftRow);
     } else {
@@ -139,19 +142,19 @@ public class BeamJoinTransforms {
   /**
    * As the method name suggests: combine two rows into one wide row.
    */
-  private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow,
-      BeamSqlRow rightRow) {
+  private static BeamRecord combineTwoRowsIntoOneHelper(BeamRecord leftRow,
+      BeamRecord rightRow) {
     // build the type
     List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
     names.addAll(leftRow.getDataType().getFieldsName());
     names.addAll(rightRow.getDataType().getFieldsName());
 
     List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
-    types.addAll(leftRow.getDataType().getFieldsType());
-    types.addAll(rightRow.getDataType().getFieldsType());
-    BeamSqlRowType type = BeamSqlRowType.create(names, types);
+    types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldsType());
+    types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldsType());
+    BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
 
-    BeamSqlRow row = new BeamSqlRow(type);
+    BeamRecord row = new BeamRecord(type);
     // build the row
     for (int i = 0; i < leftRow.size(); i++) {
       row.addField(i, leftRow.getFieldValue(i));

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
index 326b328..33ac807 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
@@ -20,10 +20,10 @@ package org.apache.beam.sdk.extensions.sql.impl.transform;
 
 import java.util.Iterator;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSetOperatorRelBase;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -35,8 +35,8 @@ public abstract class BeamSetOperatorsTransforms {
    * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}.
    */
   public static class BeamSqlRow2KvFn extends
-      SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
-    @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+      SimpleFunction<BeamRecord, KV<BeamRecord, BeamRecord>> {
+    @Override public KV<BeamRecord, BeamRecord> apply(BeamRecord input) {
       return KV.of(input, input);
     }
   }
@@ -45,14 +45,14 @@ public abstract class BeamSetOperatorsTransforms {
    * Filter function used for Set operators.
    */
   public static class SetOperatorFilteringDoFn extends
-      DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> {
-    private TupleTag<BeamSqlRow> leftTag;
-    private TupleTag<BeamSqlRow> rightTag;
+      DoFn<KV<BeamRecord, CoGbkResult>, BeamRecord> {
+    private TupleTag<BeamRecord> leftTag;
+    private TupleTag<BeamRecord> rightTag;
     private BeamSetOperatorRelBase.OpType opType;
     // ALL?
     private boolean all;
 
-    public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag,
+    public SetOperatorFilteringDoFn(TupleTag<BeamRecord> leftTag, TupleTag<BeamRecord> rightTag,
         BeamSetOperatorRelBase.OpType opType, boolean all) {
       this.leftTag = leftTag;
       this.rightTag = rightTag;
@@ -62,13 +62,13 @@ public abstract class BeamSetOperatorsTransforms {
 
     @ProcessElement public void processElement(ProcessContext ctx) {
       CoGbkResult coGbkResult = ctx.element().getValue();
-      Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag);
-      Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag);
+      Iterable<BeamRecord> leftRows = coGbkResult.getAll(leftTag);
+      Iterable<BeamRecord> rightRows = coGbkResult.getAll(rightTag);
       switch (opType) {
         case UNION:
           if (all) {
             // output both left & right
-            Iterator<BeamSqlRow> iter = leftRows.iterator();
+            Iterator<BeamRecord> iter = leftRows.iterator();
             while (iter.hasNext()) {
               ctx.output(iter.next());
             }
@@ -84,7 +84,7 @@ public abstract class BeamSetOperatorsTransforms {
         case INTERSECT:
           if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
             if (all) {
-              for (BeamSqlRow leftRow : leftRows) {
+              for (BeamRecord leftRow : leftRows) {
                 ctx.output(leftRow);
               }
             } else {
@@ -94,7 +94,7 @@ public abstract class BeamSetOperatorsTransforms {
           break;
         case MINUS:
           if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) {
-            Iterator<BeamSqlRow> iter = leftRows.iterator();
+            Iterator<BeamRecord> iter = leftRows.iterator();
             if (all) {
               // output all
               while (iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
index 855de7a..31efeb7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
@@ -20,14 +20,14 @@ package org.apache.beam.sdk.extensions.sql.impl.transform;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.BeamRecord;
 
 /**
  * {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step.
  *
  */
-public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> {
+public class BeamSqlFilterFn extends DoFn<BeamRecord, BeamRecord> {
 
   private String stepName;
   private BeamSqlExpressionExecutor executor;
@@ -45,7 +45,7 @@ public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> {
 
   @ProcessElement
   public void processElement(ProcessContext c) {
-    BeamSqlRow in = c.element();
+    BeamRecord in = c.element();
 
     List<Object> result = executor.execute(in);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
index b40cfa6..f97a90a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
@@ -17,14 +17,14 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.transform;
 
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.BeamRecord;
 
 /**
  * A test PTransform to display output in console.
  *
  */
-public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> {
+public class BeamSqlOutputToConsoleFn extends DoFn<BeamRecord, Void> {
 
   private String stepName;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
index b3f7ce5..a95c743 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
@@ -20,24 +20,24 @@ package org.apache.beam.sdk.extensions.sql.impl.transform;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.BeamRecord;
 
 /**
  *
  * {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step.
  *
  */
-public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
+public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> {
   private String stepName;
   private BeamSqlExpressionExecutor executor;
-  private BeamSqlRowType outputRowType;
+  private BeamSqlRecordType outputRowType;
 
   public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
-      BeamSqlRowType outputRowType) {
+      BeamSqlRecordType outputRowType) {
     super();
     this.stepName = stepName;
     this.executor = executor;
@@ -51,10 +51,10 @@ public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
 
   @ProcessElement
   public void processElement(ProcessContext c, BoundedWindow window) {
-    BeamSqlRow inputRow = c.element();
+    BeamRecord inputRow = c.element();
     List<Object> results = executor.execute(inputRow);
 
-    BeamSqlRow outRow = new BeamSqlRow(outputRowType);
+    BeamRecord outRow = new BeamRecord(outputRowType);
     outRow.updateWindowRange(inputRow, window);
 
     for (int idx = 0; idx < results.size(); ++idx) {

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index b80e045..bf96e85 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -78,27 +78,27 @@ public class CalciteUtils {
   /**
    * Get the {@code SqlTypeName} for the specified column of a table.
    */
-  public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) {
+  public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) {
     return toCalciteType(schema.getFieldsType().get(index));
   }
 
   /**
    * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
    */
-  public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) {
+  public static BeamSqlRecordType toBeamRowType(RelDataType tableInfo) {
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (RelDataTypeField f : tableInfo.getFieldList()) {
       fieldNames.add(f.getName());
       fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
     }
-    return BeamSqlRowType.create(fieldNames, fieldTypes);
+    return BeamSqlRecordType.create(fieldNames, fieldTypes);
   }
 
   /**
    * Create an instance of {@code RelDataType} so it can be used to create a table.
    */
-  public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) {
+  public static RelProtoDataType toCalciteRowType(final BeamSqlRecordType that) {
     return new RelProtoDataType() {
       @Override
       public RelDataType apply(RelDataTypeFactory a) {

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
index bf41c95..68b120e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
@@ -23,12 +23,12 @@ import java.io.Serializable;
  * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
  */
 public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
-  protected BeamSqlRowType beamSqlRowType;
-  public BaseBeamTable(BeamSqlRowType beamSqlRowType) {
+  protected BeamSqlRecordType beamSqlRowType;
+  public BaseBeamTable(BeamSqlRecordType beamSqlRowType) {
     this.beamSqlRowType = beamSqlRowType;
   }
 
-  @Override public BeamSqlRowType getRowType() {
+  @Override public BeamSqlRecordType getRowType() {
     return beamSqlRowType;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
index 5bbb8fd..68905b5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.schema;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PDone;
@@ -29,14 +30,14 @@ import org.apache.beam.sdk.values.PDone;
  */
 public class BeamPCollectionTable extends BaseBeamTable {
   private BeamIOType ioType;
-  private transient PCollection<BeamSqlRow> upstream;
+  private transient PCollection<BeamRecord> upstream;
 
-  protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) {
+  protected BeamPCollectionTable(BeamSqlRecordType beamSqlRowType) {
     super(beamSqlRowType);
   }
 
-  public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
-      BeamSqlRowType beamSqlRowType){
+  public BeamPCollectionTable(PCollection<BeamRecord> upstream,
+      BeamSqlRecordType beamSqlRowType){
     this(beamSqlRowType);
     ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
         ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
@@ -49,12 +50,12 @@ public class BeamPCollectionTable extends BaseBeamTable {
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
     return upstream;
   }
 
   @Override
-  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
     throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
   }