You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/04 17:09:49 UTC

[6/7] beam git commit: refactor BeamRecord, BeamRecordType, BeamSqlRecordType, BeamRecordCoder

refactor BeamRecord, BeamRecordType, BeamSqlRecordType, BeamRecordCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/89109b8c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/89109b8c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/89109b8c

Branch: refs/heads/DSL_SQL
Commit: 89109b8cdc667c4e07529e9748ed4290e88b9282
Parents: 129ae96
Author: mingmxu <mi...@ebay.com>
Authored: Thu Aug 3 12:11:06 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Aug 4 10:08:37 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BeamRecordCoder.java |  20 +-
 .../org/apache/beam/sdk/values/BeamRecord.java  |  22 +-
 .../apache/beam/sdk/values/BeamRecordType.java  |  70 ++++++
 .../beam/sdk/values/BeamRecordTypeProvider.java |  59 -----
 .../apache/beam/sdk/extensions/sql/BeamSql.java |  22 +-
 .../beam/sdk/extensions/sql/BeamSqlCli.java     |   8 +-
 .../beam/sdk/extensions/sql/BeamSqlEnv.java     |   6 +-
 .../extensions/sql/example/BeamSqlExample.java  |  27 ++-
 .../interpreter/BeamSqlExpressionExecutor.java  |   6 +-
 .../sql/impl/interpreter/BeamSqlFnExecutor.java |   6 +-
 .../operator/BeamSqlCaseExpression.java         |   4 +-
 .../operator/BeamSqlCastExpression.java         |   4 +-
 .../interpreter/operator/BeamSqlExpression.java |   8 +-
 .../operator/BeamSqlInputRefExpression.java     |   4 +-
 .../interpreter/operator/BeamSqlPrimitive.java  |   6 +-
 .../operator/BeamSqlReinterpretExpression.java  |   4 +-
 .../operator/BeamSqlUdfExpression.java          |   4 +-
 .../operator/BeamSqlWindowEndExpression.java    |   4 +-
 .../operator/BeamSqlWindowExpression.java       |   4 +-
 .../operator/BeamSqlWindowStartExpression.java  |   4 +-
 .../arithmetic/BeamSqlArithmeticExpression.java |   4 +-
 .../comparison/BeamSqlCompareExpression.java    |   4 +-
 .../comparison/BeamSqlIsNotNullExpression.java  |   4 +-
 .../comparison/BeamSqlIsNullExpression.java     |   4 +-
 .../date/BeamSqlCurrentDateExpression.java      |   4 +-
 .../date/BeamSqlCurrentTimeExpression.java      |   4 +-
 .../date/BeamSqlCurrentTimestampExpression.java |   4 +-
 .../date/BeamSqlDateCeilExpression.java         |   4 +-
 .../date/BeamSqlDateFloorExpression.java        |   4 +-
 .../operator/date/BeamSqlExtractExpression.java |   4 +-
 .../operator/logical/BeamSqlAndExpression.java  |   4 +-
 .../operator/logical/BeamSqlNotExpression.java  |   4 +-
 .../operator/logical/BeamSqlOrExpression.java   |   4 +-
 .../math/BeamSqlMathBinaryExpression.java       |   4 +-
 .../math/BeamSqlMathUnaryExpression.java        |   4 +-
 .../operator/math/BeamSqlPiExpression.java      |   4 +-
 .../operator/math/BeamSqlRandExpression.java    |   4 +-
 .../math/BeamSqlRandIntegerExpression.java      |   4 +-
 .../string/BeamSqlCharLengthExpression.java     |   4 +-
 .../string/BeamSqlConcatExpression.java         |   4 +-
 .../string/BeamSqlInitCapExpression.java        |   4 +-
 .../operator/string/BeamSqlLowerExpression.java |   4 +-
 .../string/BeamSqlOverlayExpression.java        |   4 +-
 .../string/BeamSqlPositionExpression.java       |   4 +-
 .../string/BeamSqlSubstringExpression.java      |   4 +-
 .../operator/string/BeamSqlTrimExpression.java  |   4 +-
 .../operator/string/BeamSqlUpperExpression.java |   4 +-
 .../sql/impl/planner/BeamQueryPlanner.java      |   4 +-
 .../sql/impl/rel/BeamAggregationRel.java        |  38 ++--
 .../extensions/sql/impl/rel/BeamFilterRel.java  |  11 +-
 .../extensions/sql/impl/rel/BeamIOSinkRel.java  |   6 +-
 .../sql/impl/rel/BeamIOSourceRel.java           |  13 +-
 .../sql/impl/rel/BeamIntersectRel.java          |   4 +-
 .../extensions/sql/impl/rel/BeamJoinRel.java    |  75 ++++---
 .../extensions/sql/impl/rel/BeamMinusRel.java   |   4 +-
 .../extensions/sql/impl/rel/BeamProjectRel.java |  11 +-
 .../extensions/sql/impl/rel/BeamRelNode.java    |   4 +-
 .../sql/impl/rel/BeamSetOperatorRelBase.java    |  18 +-
 .../extensions/sql/impl/rel/BeamSortRel.java    |  49 ++---
 .../extensions/sql/impl/rel/BeamUnionRel.java   |   4 +-
 .../extensions/sql/impl/rel/BeamValuesRel.java  |  15 +-
 .../transform/BeamAggregationTransforms.java    |  47 ++--
 .../sql/impl/transform/BeamJoinTransforms.java  |  65 +++---
 .../transform/BeamSetOperatorsTransforms.java   |  24 +-
 .../sql/impl/transform/BeamSqlFilterFn.java     |   6 +-
 .../transform/BeamSqlOutputToConsoleFn.java     |   4 +-
 .../sql/impl/transform/BeamSqlProjectFn.java    |  14 +-
 .../extensions/sql/impl/utils/CalciteUtils.java |  10 +-
 .../extensions/sql/schema/BaseBeamTable.java    |   6 +-
 .../sql/schema/BeamPCollectionTable.java        |  13 +-
 .../sql/schema/BeamSqlRecordHelper.java         | 217 +++++++++++++++++++
 .../sql/schema/BeamSqlRecordType.java           | 168 ++++++++++++++
 .../sdk/extensions/sql/schema/BeamSqlRow.java   |  41 ----
 .../extensions/sql/schema/BeamSqlRowCoder.java  | 186 ----------------
 .../extensions/sql/schema/BeamSqlRowType.java   | 109 ----------
 .../sdk/extensions/sql/schema/BeamSqlTable.java |   7 +-
 .../extensions/sql/schema/BeamTableUtils.java   |  14 +-
 .../sql/schema/kafka/BeamKafkaCSVTable.java     |  38 ++--
 .../sql/schema/kafka/BeamKafkaTable.java        |  20 +-
 .../sql/schema/text/BeamTextCSVTable.java       |  12 +-
 .../schema/text/BeamTextCSVTableIOReader.java   |  14 +-
 .../schema/text/BeamTextCSVTableIOWriter.java   |  16 +-
 .../sql/schema/text/BeamTextTable.java          |   4 +-
 .../sql/BeamSqlDslAggregationTest.java          |  80 +++----
 .../beam/sdk/extensions/sql/BeamSqlDslBase.java |  51 +++--
 .../extensions/sql/BeamSqlDslFilterTest.java    |  26 +--
 .../sdk/extensions/sql/BeamSqlDslJoinTest.java  |  26 +--
 .../extensions/sql/BeamSqlDslProjectTest.java   |  64 +++---
 .../extensions/sql/BeamSqlDslUdfUdafTest.java   |  24 +-
 .../beam/sdk/extensions/sql/TestUtils.java      |  30 +--
 .../interpreter/BeamSqlFnExecutorTestBase.java  |  10 +-
 .../sql/impl/rel/BeamIntersectRelTest.java      |   6 +-
 .../rel/BeamJoinRelBoundedVsBoundedTest.java    |  10 +-
 .../rel/BeamJoinRelUnboundedVsBoundedTest.java  |  10 +-
 .../BeamJoinRelUnboundedVsUnboundedTest.java    |  10 +-
 .../sql/impl/rel/BeamMinusRelTest.java          |   6 +-
 .../impl/rel/BeamSetOperatorRelBaseTest.java    |   4 +-
 .../sql/impl/rel/BeamSortRelTest.java           |  12 +-
 .../sql/impl/rel/BeamUnionRelTest.java          |   6 +-
 .../sql/impl/rel/BeamValuesRelTest.java         |   8 +-
 .../sdk/extensions/sql/impl/rel/CheckSize.java  |   8 +-
 ...mSqlBuiltinFunctionsIntegrationTestBase.java |  17 +-
 ...amSqlComparisonOperatorsIntegrationTest.java |  11 +-
 .../BeamSqlDateFunctionsIntegrationTest.java    |  12 +-
 .../extensions/sql/mock/MockedBoundedTable.java |  24 +-
 .../sdk/extensions/sql/mock/MockedTable.java    |   8 +-
 .../sql/mock/MockedUnboundedTable.java          |  18 +-
 .../sql/schema/BeamSqlRowCoderTest.java         |   8 +-
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |  14 +-
 .../sql/schema/text/BeamTextCSVTableTest.java   |  16 +-
 .../transform/BeamAggregationTransformTest.java |  72 +++---
 .../schema/transform/BeamTransformBaseTest.java |  18 +-
 112 files changed, 1171 insertions(+), 1129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index 27f92ce..06958a4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -24,7 +24,7 @@ import java.util.BitSet;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.BeamRecordTypeProvider;
+import org.apache.beam.sdk.values.BeamRecordType;
 
 /**
  *  A {@link Coder} for {@link BeamRecord}. It wraps the {@link Coder} for each element directly.
@@ -34,31 +34,35 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
   private static final BitSetCoder nullListCoder = BitSetCoder.of();
   private static final InstantCoder instantCoder = InstantCoder.of();
 
-  private BeamRecordTypeProvider recordType;
+  private BeamRecordType recordType;
   private List<Coder> coderArray;
 
-  private BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> coderArray) {
+  private BeamRecordCoder(BeamRecordType recordType, List<Coder> coderArray) {
     this.recordType = recordType;
     this.coderArray = coderArray;
   }
 
-  public static BeamRecordCoder of(BeamRecordTypeProvider recordType, List<Coder> coderArray){
+  public static BeamRecordCoder of(BeamRecordType recordType, List<Coder> coderArray){
     if (recordType.size() != coderArray.size()) {
       throw new IllegalArgumentException("Coder size doesn't match with field size");
     }
     return new BeamRecordCoder(recordType, coderArray);
   }
 
+  public BeamRecordType getRecordType() {
+    return recordType;
+  }
+
   @Override
   public void encode(BeamRecord value, OutputStream outStream)
       throws CoderException, IOException {
     nullListCoder.encode(value.getNullFields(), outStream);
     for (int idx = 0; idx < value.size(); ++idx) {
-      if (value.getNullFields().get(idx)) {
+      if (value.isNull(idx)) {
         continue;
       }
 
-      coderArray.get(idx).encode(value.getInteger(idx), outStream);
+      coderArray.get(idx).encode(value.getFieldValue(idx), outStream);
     }
 
     instantCoder.encode(value.getWindowStart(), outStream);
@@ -70,7 +74,6 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
     BitSet nullFields = nullListCoder.decode(inStream);
 
     BeamRecord record = new BeamRecord(recordType);
-    record.setNullFields(nullFields);
     for (int idx = 0; idx < recordType.size(); ++idx) {
       if (nullFields.get(idx)) {
         continue;
@@ -88,5 +91,8 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
   @Override
   public void verifyDeterministic()
       throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+    for (Coder c : coderArray) {
+      c.verifyDeterministic();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index 476233e..bac649e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -32,7 +32,7 @@ import org.joda.time.Instant;
 
 /**
  * {@link org.apache.beam.sdk.values.BeamRecord}, self-described with
- * {@link BeamRecordTypeProvider}, represents one element in a
+ * {@link BeamRecordType}, represents one element in a
  * {@link org.apache.beam.sdk.values.PCollection}.
  */
 @Experimental
@@ -40,12 +40,12 @@ public class BeamRecord implements Serializable {
   private List<Object> dataValues;
   //null values are indexed here, to handle properly in Coder.
   private BitSet nullFields;
-  private BeamRecordTypeProvider dataType;
+  private BeamRecordType dataType;
 
   private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
   private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
 
-  public BeamRecord(BeamRecordTypeProvider dataType) {
+  public BeamRecord(BeamRecordType dataType) {
     this.dataType = dataType;
     this.nullFields = new BitSet(dataType.size());
     this.dataValues = new ArrayList<>();
@@ -55,7 +55,7 @@ public class BeamRecord implements Serializable {
     }
   }
 
-  public BeamRecord(BeamRecordTypeProvider dataType, List<Object> dataValues) {
+  public BeamRecord(BeamRecordType dataType, List<Object> dataValues) {
     this(dataType);
     for (int idx = 0; idx < dataValues.size(); ++idx) {
       addField(idx, dataValues.get(idx));
@@ -137,10 +137,6 @@ public class BeamRecord implements Serializable {
   }
 
   public Object getFieldValue(int fieldIdx) {
-    if (nullFields.get(fieldIdx)) {
-      return null;
-    }
-
     return dataValues.get(fieldIdx);
   }
 
@@ -200,22 +196,14 @@ public class BeamRecord implements Serializable {
     this.dataValues = dataValues;
   }
 
-  public BeamRecordTypeProvider getDataType() {
+  public BeamRecordType getDataType() {
     return dataType;
   }
 
-  public void setDataType(BeamRecordTypeProvider dataType) {
-    this.dataType = dataType;
-  }
-
   public BitSet getNullFields() {
     return nullFields;
   }
 
-  public void setNullFields(BitSet nullFields) {
-    this.nullFields = nullFields;
-  }
-
   /**
    * is the specified field NULL?
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
new file mode 100644
index 0000000..3b20b50
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * The default type provider used in {@link BeamRecord}.
+ */
+@Experimental
+public class BeamRecordType implements Serializable{
+  private List<String> fieldsName;
+  private List<Coder> fieldsCoder;
+
+  public BeamRecordType(List<String> fieldsName, List<Coder> fieldsCoder) {
+    this.fieldsName = fieldsName;
+    this.fieldsCoder = fieldsCoder;
+  }
+
+  /**
+   * Validate input fieldValue for a field.
+   * @throws IllegalArgumentException throw exception when the validation fails.
+   */
+   public void validateValueType(int index, Object fieldValue)
+      throws IllegalArgumentException{
+     //do nothing by default.
+   }
+
+   /**
+    * Get the coder for {@link BeamRecordCoder}.
+    */
+   public BeamRecordCoder getRecordCoder(){
+     return BeamRecordCoder.of(this, fieldsCoder);
+   }
+
+   public List<String> getFieldsName(){
+     return fieldsName;
+   }
+
+   public String getFieldByIndex(int index){
+     return fieldsName.get(index);
+   }
+
+   public int findIndexOfField(String fieldName){
+     return fieldsName.indexOf(fieldName);
+   }
+
+  public int size(){
+    return fieldsName.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java
deleted file mode 100644
index 63a961c..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.values;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-
-/**
- * The default type provider used in {@link BeamRecord}.
- */
-@Experimental
-public class BeamRecordTypeProvider implements Serializable{
-  private List<String> fieldsName;
-
-  public BeamRecordTypeProvider(List<String> fieldsName) {
-    this.fieldsName = fieldsName;
-  }
-
-  /**
-   * Validate input fieldValue for a field.
-   * @throws IllegalArgumentException throw exception when the validation fails.
-   */
-   public void validateValueType(int index, Object fieldValue)
-      throws IllegalArgumentException{
-     //do nothing by default.
-   }
-
-   public List<String> getFieldsName(){
-     return fieldsName;
-   }
-
-   public String getFieldByIndex(int index){
-     return fieldsName.get(index);
-   }
-
-   public int findIndexOfField(String fieldName){
-     return fieldsName.indexOf(fieldName);
-   }
-
-  public int size(){
-    return fieldsName.size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index 0dabf40..86e4d8d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -19,13 +19,14 @@ package org.apache.beam.sdk.extensions.sql;
 
 import com.google.auto.value.AutoValue;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
@@ -111,7 +112,7 @@ public class BeamSql {
    */
   @AutoValue
   public abstract static class QueryTransform extends
-      PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
+      PTransform<PCollectionTuple, PCollection<BeamRecord>> {
     abstract BeamSqlEnv getSqlEnv();
     abstract String getSqlQuery();
 
@@ -143,7 +144,7 @@ public class BeamSql {
      }
 
     @Override
-    public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
+    public PCollection<BeamRecord> expand(PCollectionTuple input) {
       registerTables(input);
 
       BeamRelNode beamRelNode = null;
@@ -163,11 +164,12 @@ public class BeamSql {
     //register tables, related with input PCollections.
     private void registerTables(PCollectionTuple input){
       for (TupleTag<?> sourceTag : input.getAll().keySet()) {
-        PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
-        BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
+        PCollection<BeamRecord> sourceStream = (PCollection<BeamRecord>) input.get(sourceTag);
+        BeamRecordCoder sourceCoder = (BeamRecordCoder) sourceStream.getCoder();
 
         getSqlEnv().registerTable(sourceTag.getId(),
-            new BeamPCollectionTable(sourceStream, sourceCoder.getSqlRecordType()));
+            new BeamPCollectionTable(sourceStream,
+                (BeamSqlRecordType) sourceCoder.getRecordType()));
       }
     }
   }
@@ -178,7 +180,7 @@ public class BeamSql {
    */
   @AutoValue
   public abstract static class SimpleQueryTransform
-      extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
+      extends PTransform<PCollection<BeamRecord>, PCollection<BeamRecord>> {
     private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
     abstract BeamSqlEnv getSqlEnv();
     abstract String getSqlQuery();
@@ -232,9 +234,9 @@ public class BeamSql {
     }
 
     @Override
-    public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
+    public PCollection<BeamRecord> expand(PCollection<BeamRecord> input) {
       validateQuery();
-      return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input)
+      return PCollectionTuple.of(new TupleTag<BeamRecord>(PCOLLECTION_TABLE_NAME), input)
           .apply(QueryTransform.builder()
               .setSqlEnv(getSqlEnv())
               .setSqlQuery(getSqlQuery())

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 967dee5..a43808e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -20,9 +20,9 @@ package org.apache.beam.sdk.extensions.sql;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.plan.RelOptUtil;
 
@@ -43,7 +43,7 @@ public class BeamSqlCli {
   /**
    * compile SQL, and return a {@link Pipeline}.
    */
-  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
+  public static PCollection<BeamRecord> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
       throws Exception{
     PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
         .as(PipelineOptions.class); // FlinkPipelineOptions.class
@@ -56,9 +56,9 @@ public class BeamSqlCli {
   /**
    * compile SQL, and return a {@link Pipeline}.
    */
-  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline,
+  public static PCollection<BeamRecord> compilePipeline(String sqlStatement, Pipeline basePipeline,
       BeamSqlEnv sqlEnv) throws Exception{
-    PCollection<BeamSqlRow> resultStream =
+    PCollection<BeamRecord> resultStream =
         sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
     return resultStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
index be0b0af..3c5eb36 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.calcite.DataContext;
@@ -84,8 +84,8 @@ public class BeamSqlEnv implements Serializable{
   }
 
   private static class BeamCalciteTable implements ScannableTable, Serializable {
-    private BeamSqlRowType beamSqlRowType;
-    public BeamCalciteTable(BeamSqlRowType beamSqlRowType) {
+    private BeamSqlRecordType beamSqlRowType;
+    public BeamCalciteTable(BeamSqlRecordType beamSqlRowType) {
       this.beamSqlRowType = beamSqlRowType;
     }
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 21e02a7..fbc1fd8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -22,14 +22,13 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.BeamSql;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -54,39 +53,39 @@ class BeamSqlExample {
     //define the input row format
     List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
     List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
-    BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes);
-    BeamSqlRow row = new BeamSqlRow(type);
+    BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
+    BeamRecord row = new BeamRecord(type);
     row.addField(0, 1);
     row.addField(1, "row");
     row.addField(2, 1.0);
 
     //create a source PCollection with Create.of();
-    PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
-        .withCoder(new BeamSqlRowCoder(type)));
+    PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row)
+        .withCoder(type.getRecordCoder()));
 
     //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
-    PCollection<BeamSqlRow> outputStream = inputTable.apply(
+    PCollection<BeamRecord> outputStream = inputTable.apply(
         BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
 
     //print the output record of case 1;
     outputStream.apply("log_result",
-        MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
-      public Void apply(BeamSqlRow input) {
+        MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
+      public Void apply(BeamRecord input) {
         System.out.println("PCOLLECTION: " + input);
         return null;
       }
     }));
 
     //Case 2. run the query with BeamSql.query over result PCollection of case 1.
-    PCollection<BeamSqlRow> outputStream2 =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("CASE1_RESULT"), outputStream)
+    PCollection<BeamRecord> outputStream2 =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream)
         .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1"));
 
     //print the output record of case 2;
     outputStream2.apply("log_result",
-        MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+        MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
       @Override
-      public Void apply(BeamSqlRow input) {
+      public Void apply(BeamRecord input) {
         System.out.println("TABLE_B: " + input);
         return null;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
index 1ae6bb3..3cd6d65 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter;
 
 import java.io.Serializable;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 
 /**
  * {@code BeamSqlExpressionExecutor} fills the gap between relational
@@ -34,10 +34,10 @@ public interface BeamSqlExpressionExecutor extends Serializable {
   void prepare();
 
   /**
-   * apply transformation to input record {@link BeamSqlRow}.
+   * apply transformation to input record {@link BeamRecord}.
    *
    */
-  List<Object> execute(BeamSqlRow inputRow);
+  List<Object> execute(BeamRecord inputRow);
 
   void close();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 1f9e0e3..0f77ed8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -88,7 +88,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamS
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
@@ -102,7 +102,7 @@ import org.apache.calcite.util.NlsString;
 /**
  * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
  * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
- * which can be evaluated against the {@link BeamSqlRow}.
+ * which can be evaluated against the {@link BeamRecord}.
  *
  */
 public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
@@ -427,7 +427,7 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
   }
 
   @Override
-  public List<Object> execute(BeamSqlRow inputRow) {
+  public List<Object> execute(BeamRecord inputRow) {
     List<Object> results = new ArrayList<>();
     for (BeamSqlExpression exp : exps) {
       results.add(exp.evaluate(inputRow).getValue());

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
index 61e8aae..af48cbe 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
@@ -19,7 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -48,7 +48,7 @@ public class BeamSqlCaseExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     for (int i = 0; i < operands.size() - 1; i += 2) {
       if (opValueEvaluated(i, inputRow)) {
         return BeamSqlPrimitive.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
index c98c10d..3786281 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.runtime.SqlFunctions;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.joda.time.format.DateTimeFormat;
@@ -71,7 +71,7 @@ public class BeamSqlCastExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     SqlTypeName castOutputType = getOutputType();
     switch (castOutputType) {
       case INTEGER:

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
index dc5db81..f42a365 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.io.Serializable;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -49,7 +49,7 @@ public abstract class BeamSqlExpression implements Serializable {
     return op(idx).getOutputType();
   }
 
-  public <T> T opValueEvaluated(int idx, BeamSqlRow row) {
+  public <T> T opValueEvaluated(int idx, BeamRecord row) {
     return (T) op(idx).evaluate(row).getValue();
   }
 
@@ -59,10 +59,10 @@ public abstract class BeamSqlExpression implements Serializable {
   public abstract boolean accept();
 
   /**
-   * Apply input record {@link BeamSqlRow} to this expression,
+   * Apply input record {@link BeamRecord} to this expression,
    * the output value is wrapped with {@link BeamSqlPrimitive}.
    */
-  public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
+  public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow);
 
   public List<BeamSqlExpression> getOperands() {
     return operands;

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
index 7aba024..8c3d4d4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -37,7 +37,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index 6380af9..f763898 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -21,13 +21,13 @@ import java.math.BigDecimal;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.NlsString;
 
 /**
  * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
- * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamRecord)}.
  *
  */
 public class BeamSqlPrimitive<T> extends BeamSqlExpression {
@@ -145,7 +145,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive<T> evaluate(BeamRecord inputRow) {
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
index 243baaa..c1fa2c7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -41,7 +41,7 @@ public class BeamSqlReinterpretExpression extends BeamSqlExpression {
         && SqlTypeName.DATETIME_TYPES.contains(opType(0));
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     if (opType(0) == SqlTypeName.TIME) {
       GregorianCalendar date = opValueEvaluated(0, inputRow);
       return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
index eebb97c..da706f3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -51,7 +51,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     if (method == null) {
       reConstructMethod();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
index 0bd68df..2f4c165 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -34,7 +34,7 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
         new Date(inputRow.getWindowEnd().getMillis()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
index b560ef8..2f3dd5c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.util.Date;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -42,7 +42,7 @@ public class BeamSqlWindowExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
         (Date) operands.get(0).evaluate(inputRow).getValue());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
index e2c1b34..9186ec0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -35,7 +35,7 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
         new Date(inputRow.getWindowStart().getMillis()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index b07b28f..fd36457 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -50,7 +50,7 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
     super(operands, outputType);
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
     BigDecimal left = BigDecimal.valueOf(
         Double.valueOf(opValueEvaluated(0, inputRow).toString()));
     BigDecimal right = BigDecimal.valueOf(

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
index 811b582..93032ae 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -51,7 +51,7 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
     Object leftValue = operands.get(0).evaluate(inputRow).getValue();
     Object rightValue = operands.get(1).evaluate(inputRow).getValue();
     switch (operands.get(0).getOutputType()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
index 88dc73f..7177d96 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
@@ -21,7 +21,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -46,7 +46,7 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
     Object leftValue = operands.get(0).evaluate(inputRow).getValue();
     return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
index b626ce7..c74fcd9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
@@ -21,7 +21,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -46,7 +46,7 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
     Object leftValue = operands.get(0).evaluate(inputRow).getValue();
     return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
index d5793d5..86abe43 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -22,7 +22,7 @@ import java.util.Collections;
 import java.util.Date;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -38,7 +38,7 @@ public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
     return getOperands().size() == 0;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     return BeamSqlPrimitive.of(outputType, new Date());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
index 99eea95..d8de464 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.TimeZone;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -44,7 +44,7 @@ public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
     return opCount <= 1;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
     ret.setTime(new Date());
     return BeamSqlPrimitive.of(outputType, ret);

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
index 09a3c60..4736571 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
@@ -22,7 +22,7 @@ import java.util.Date;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -42,7 +42,7 @@ public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression {
     return opCount <= 1;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     return BeamSqlPrimitive.of(outputType, new Date());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
index 55b6fcd..55767fa 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -22,7 +22,7 @@ import java.util.Date;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -41,7 +41,7 @@ public class BeamSqlDateCeilExpression extends BeamSqlExpression {
         && opType(1) == SqlTypeName.SYMBOL;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     Date date = opValueEvaluated(0, inputRow);
     long time = date.getTime();
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
index f031c31..3310da5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -22,7 +22,7 @@ import java.util.Date;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -41,7 +41,7 @@ public class BeamSqlDateFloorExpression extends BeamSqlExpression {
         && opType(1) == SqlTypeName.SYMBOL;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     Date date = opValueEvaluated(0, inputRow);
     long time = date.getTime();
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
index 2740f82..47cd879 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -61,7 +61,7 @@ public class BeamSqlExtractExpression extends BeamSqlExpression {
         && opType(1) == SqlTypeName.BIGINT;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     Long time = opValueEvaluated(1, inputRow);
 
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
index 0c8854c..b8964d5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -32,7 +32,7 @@ public class BeamSqlAndExpression extends BeamSqlLogicalExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
     boolean result = true;
     for (BeamSqlExpression exp : operands) {
       BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
index 65634b0..f9578b9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -42,7 +42,7 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
     return super.accept();
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     Boolean value = opValueEvaluated(0, inputRow);
     if (value == null) {
       return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
index da15c34..88a3916 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -32,7 +32,7 @@ public class BeamSqlOrExpression extends BeamSqlLogicalExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
     boolean result = false;
     for (BeamSqlExpression exp : operands) {
       BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
index c12b725..8f6c00c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -38,7 +38,7 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression {
     return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1));
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
     BeamSqlExpression leftOp = op(0);
     BeamSqlExpression rightOp = op(1);
     return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow));

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
index 163c40e..b225b8e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 
@@ -45,7 +45,7 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression {
     return acceptance;
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
     BeamSqlExpression operand = op(0);
     return calculate(operand.evaluate(inputRow));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
index dfaf546..676f859 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
 
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -36,7 +36,7 @@ public class BeamSqlPiExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
index f2d7a47..0575978 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.Random;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -42,7 +42,7 @@ public class BeamSqlRandExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive evaluate(BeamRecord inputRecord) {
     if (operands.size() == 1) {
       int rowSeed = opValueEvaluated(0, inputRecord);
       if (seed == null || seed != rowSeed) {

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
index b2e65ce..52f0cc1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.Random;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -43,7 +43,7 @@ public class BeamSqlRandIntegerExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive evaluate(BeamRecord inputRecord) {
     int numericIdx = 0;
     if (operands.size() == 2) {
       int rowSeed = opValueEvaluated(0, inputRecord);

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
index 580d747..974e2bc 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -32,7 +32,7 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.INTEGER);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     String str = opValueEvaluated(0, inputRow);
     return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
index 772ad41..14ef55d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -51,7 +51,7 @@ public class BeamSqlConcatExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     String left = opValueEvaluated(0, inputRow);
     String right = opValueEvaluated(1, inputRow);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
index dc893e7..e50872b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -32,7 +32,7 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     String str = opValueEvaluated(0, inputRow);
 
     StringBuilder ret = new StringBuilder(str);

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
index fd9d7aa..0f9a501 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -32,7 +32,7 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     String str = opValueEvaluated(0, inputRow);
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
index 8d38efb..2336876 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -54,7 +54,7 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
     String str = opValueEvaluated(0, inputRow);
     String replaceStr = opValueEvaluated(1, inputRow);
     int idx = opValueEvaluated(2, inputRow);