You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/04 17:09:47 UTC

[4/7] beam git commit: refactor BeamRecord, BeamRecordType, BeamSqlRecordType, BeamRecordCoder

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
new file mode 100644
index 0000000..b910c84
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.values.BeamRecord;
+
+/**
+ * A {@link Coder} encodes {@link BeamRecord}.
+ */
+@Experimental
+public class BeamSqlRecordHelper {
+
+  public static BeamSqlRecordType getSqlRecordType(BeamRecord record) {
+    return (BeamSqlRecordType) record.getDataType();
+  }
+
+  /**
+   * {@link Coder} for Java type {@link Short}.
+   */
+  public static class ShortCoder extends CustomCoder<Short> {
+    private static final ShortCoder INSTANCE = new ShortCoder();
+
+    public static ShortCoder of() {
+      return INSTANCE;
+    }
+
+    private ShortCoder() {
+    }
+
+    @Override
+    public void encode(Short value, OutputStream outStream) throws CoderException, IOException {
+      new DataOutputStream(outStream).writeShort(value);
+    }
+
+    @Override
+    public Short decode(InputStream inStream) throws CoderException, IOException {
+      return new DataInputStream(inStream).readShort();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+  /**
+   * {@link Coder} for Java type {@link Float}, it's stored as {@link BigDecimal}.
+   */
+  public static class FloatCoder extends CustomCoder<Float> {
+    private static final FloatCoder INSTANCE = new FloatCoder();
+    private static final BigDecimalCoder CODER = BigDecimalCoder.of();
+
+    public static FloatCoder of() {
+      return INSTANCE;
+    }
+
+    private FloatCoder() {
+    }
+
+    @Override
+    public void encode(Float value, OutputStream outStream) throws CoderException, IOException {
+      CODER.encode(new BigDecimal(value), outStream);
+    }
+
+    @Override
+    public Float decode(InputStream inStream) throws CoderException, IOException {
+      return CODER.decode(inStream).floatValue();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+  /**
+   * {@link Coder} for Java type {@link Double}, it's stored as {@link BigDecimal}.
+   */
+  public static class DoubleCoder extends CustomCoder<Double> {
+    private static final DoubleCoder INSTANCE = new DoubleCoder();
+    private static final BigDecimalCoder CODER = BigDecimalCoder.of();
+
+    public static DoubleCoder of() {
+      return INSTANCE;
+    }
+
+    private DoubleCoder() {
+    }
+
+    @Override
+    public void encode(Double value, OutputStream outStream) throws CoderException, IOException {
+      CODER.encode(new BigDecimal(value), outStream);
+    }
+
+    @Override
+    public Double decode(InputStream inStream) throws CoderException, IOException {
+      return CODER.decode(inStream).doubleValue();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+
+  /**
+   * {@link Coder} for Java type {@link GregorianCalendar}, it's stored as {@link Long}.
+   */
+  public static class TimeCoder extends CustomCoder<GregorianCalendar> {
+    private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+    private static final TimeCoder INSTANCE = new TimeCoder();
+
+    public static TimeCoder of() {
+      return INSTANCE;
+    }
+
+    private TimeCoder() {
+    }
+
+    @Override
+    public void encode(GregorianCalendar value, OutputStream outStream)
+        throws CoderException, IOException {
+      longCoder.encode(value.getTime().getTime(), outStream);
+    }
+
+    @Override
+    public GregorianCalendar decode(InputStream inStream) throws CoderException, IOException {
+      GregorianCalendar calendar = new GregorianCalendar();
+      calendar.setTime(new Date(longCoder.decode(inStream)));
+      return calendar;
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+  /**
+   * {@link Coder} for Java type {@link Date}, it's stored as {@link Long}.
+   */
+  public static class DateCoder extends CustomCoder<Date> {
+    private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+    private static final DateCoder INSTANCE = new DateCoder();
+
+    public static DateCoder of() {
+      return INSTANCE;
+    }
+
+    private DateCoder() {
+    }
+
+    @Override
+    public void encode(Date value, OutputStream outStream) throws CoderException, IOException {
+      longCoder.encode(value.getTime(), outStream);
+    }
+
+    @Override
+    public Date decode(InputStream inStream) throws CoderException, IOException {
+      return new Date(longCoder.decode(inStream));
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+
+  /**
+   * {@link Coder} for Java type {@link Boolean}.
+   */
+  public static class BooleanCoder extends CustomCoder<Boolean> {
+    private static final BooleanCoder INSTANCE = new BooleanCoder();
+
+    public static BooleanCoder of() {
+      return INSTANCE;
+    }
+
+    private BooleanCoder() {
+    }
+
+    @Override
+    public void encode(Boolean value, OutputStream outStream) throws CoderException, IOException {
+      new DataOutputStream(outStream).writeBoolean(value);
+    }
+
+    @Override
+    public Boolean decode(InputStream inStream) throws CoderException, IOException {
+      return new DataInputStream(inStream).readBoolean();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
new file mode 100644
index 0000000..b295049
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.BeamRecordType;
+
+/**
+ * Type provider for {@link BeamRecord} with SQL types.
+ *
+ * <p>Limited SQL types are supported now, visit
+ * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
+ * for more details.
+ *
+ */
+public class BeamSqlRecordType extends BeamRecordType {
+  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
+  static {
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+  }
+
+  public List<Integer> fieldsType;
+
+  protected BeamSqlRecordType(List<String> fieldsName, List<Coder> fieldsCoder) {
+    super(fieldsName, fieldsCoder);
+  }
+
+  private BeamSqlRecordType(List<String> fieldsName, List<Integer> fieldsType
+      , List<Coder> fieldsCoder) {
+    super(fieldsName, fieldsCoder);
+    this.fieldsType = fieldsType;
+  }
+
+  public static BeamSqlRecordType create(List<String> fieldNames,
+      List<Integer> fieldTypes) {
+    List<Coder> fieldCoders = new ArrayList<>();
+    for (int idx = 0; idx < fieldTypes.size(); ++idx) {
+      switch (fieldTypes.get(idx)) {
+      case Types.INTEGER:
+        fieldCoders.add(BigEndianIntegerCoder.of());
+        break;
+      case Types.SMALLINT:
+        fieldCoders.add(ShortCoder.of());
+        break;
+      case Types.TINYINT:
+        fieldCoders.add(ByteCoder.of());
+        break;
+      case Types.DOUBLE:
+        fieldCoders.add(DoubleCoder.of());
+        break;
+      case Types.FLOAT:
+        fieldCoders.add(FloatCoder.of());
+        break;
+      case Types.DECIMAL:
+        fieldCoders.add(BigDecimalCoder.of());
+        break;
+      case Types.BIGINT:
+        fieldCoders.add(BigEndianLongCoder.of());
+        break;
+      case Types.VARCHAR:
+      case Types.CHAR:
+        fieldCoders.add(StringUtf8Coder.of());
+        break;
+      case Types.TIME:
+        fieldCoders.add(TimeCoder.of());
+        break;
+      case Types.DATE:
+      case Types.TIMESTAMP:
+        fieldCoders.add(DateCoder.of());
+        break;
+      case Types.BOOLEAN:
+        fieldCoders.add(BooleanCoder.of());
+        break;
+
+      default:
+        throw new UnsupportedOperationException(
+            "Data type: " + fieldTypes.get(idx) + " not supported yet!");
+      }
+    }
+    return new BeamSqlRecordType(fieldNames, fieldTypes, fieldCoders);
+  }
+
+  @Override
+  public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
+    int fieldType = fieldsType.get(index);
+    Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
+    if (javaClazz == null) {
+      throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
+    }
+
+    if (!fieldValue.getClass().equals(javaClazz)) {
+      throw new IllegalArgumentException(
+          String.format("[%s](%s) doesn't match type [%s]",
+              fieldValue, fieldValue.getClass(), fieldType)
+      );
+    }
+  }
+
+  public List<Integer> getFieldsType() {
+    return fieldsType;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj != null && obj instanceof BeamSqlRecordType) {
+      BeamSqlRecordType ins = (BeamSqlRecordType) obj;
+      return fieldsType.equals(ins.getFieldsType()) && getFieldsName().equals(ins.getFieldsName());
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return 31 * getFieldsName().hashCode() + getFieldsType().hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
deleted file mode 100644
index cb5c7ea..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.util.List;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * {@link BeamSqlRow} represents one row element in a {@link PCollection},
- * with type provider {@link BeamSqlRowType}.
- */
-public class BeamSqlRow extends BeamRecord {
-  public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
-    super(dataType, dataValues);
-  }
-
-  public BeamSqlRow(BeamSqlRowType dataType) {
-    super(dataType);
-  }
-
-  @Override
-  public BeamSqlRowType getDataType() {
-    return (BeamSqlRowType) super.getDataType();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
deleted file mode 100644
index c7656af..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.sql.Types;
-import java.util.BitSet;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.BitSetCoder;
-import org.apache.beam.sdk.coders.ByteCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.DoubleCoder;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-
-/**
- *  A {@link Coder} encodes {@link BeamSqlRow}.
- */
-public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
-  private BeamSqlRowType sqlRecordType;
-
-  private static final BitSetCoder nullListCoder = BitSetCoder.of();
-
-  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
-  private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
-  private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
-  private static final DoubleCoder doubleCoder = DoubleCoder.of();
-  private static final InstantCoder instantCoder = InstantCoder.of();
-  private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
-  private static final ByteCoder byteCoder = ByteCoder.of();
-
-  public BeamSqlRowCoder(BeamSqlRowType sqlRecordType) {
-    this.sqlRecordType = sqlRecordType;
-  }
-
-  @Override
-  public void encode(BeamSqlRow value, OutputStream outStream)
-      throws CoderException, IOException {
-    nullListCoder.encode(value.getNullFields(), outStream);
-    for (int idx = 0; idx < value.size(); ++idx) {
-      if (value.getNullFields().get(idx)) {
-        continue;
-      }
-
-      switch (sqlRecordType.getFieldsType().get(idx)) {
-        case Types.INTEGER:
-          intCoder.encode(value.getInteger(idx), outStream);
-          break;
-        case Types.SMALLINT:
-          intCoder.encode((int) value.getShort(idx), outStream);
-          break;
-        case Types.TINYINT:
-          byteCoder.encode(value.getByte(idx), outStream);
-          break;
-        case Types.DOUBLE:
-          doubleCoder.encode(value.getDouble(idx), outStream);
-          break;
-        case Types.FLOAT:
-          doubleCoder.encode((double) value.getFloat(idx), outStream);
-          break;
-        case Types.DECIMAL:
-          bigDecimalCoder.encode(value.getBigDecimal(idx), outStream);
-          break;
-        case Types.BIGINT:
-          longCoder.encode(value.getLong(idx), outStream);
-          break;
-        case Types.VARCHAR:
-        case Types.CHAR:
-          stringCoder.encode(value.getString(idx), outStream);
-          break;
-        case Types.TIME:
-          longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream);
-          break;
-        case Types.DATE:
-        case Types.TIMESTAMP:
-          longCoder.encode(value.getDate(idx).getTime(), outStream);
-          break;
-        case Types.BOOLEAN:
-          byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream);
-          break;
-
-        default:
-          throw new UnsupportedOperationException(
-              "Data type: " + sqlRecordType.getFieldsType().get(idx) + " not supported yet!");
-      }
-    }
-
-    instantCoder.encode(value.getWindowStart(), outStream);
-    instantCoder.encode(value.getWindowEnd(), outStream);
-  }
-
-  @Override
-  public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException {
-    BitSet nullFields = nullListCoder.decode(inStream);
-
-    BeamSqlRow record = new BeamSqlRow(sqlRecordType);
-    record.setNullFields(nullFields);
-    for (int idx = 0; idx < sqlRecordType.size(); ++idx) {
-      if (nullFields.get(idx)) {
-        continue;
-      }
-
-      switch (sqlRecordType.getFieldsType().get(idx)) {
-        case Types.INTEGER:
-          record.addField(idx, intCoder.decode(inStream));
-          break;
-        case Types.SMALLINT:
-          record.addField(idx, intCoder.decode(inStream).shortValue());
-          break;
-        case Types.TINYINT:
-          record.addField(idx, byteCoder.decode(inStream));
-          break;
-        case Types.DOUBLE:
-          record.addField(idx, doubleCoder.decode(inStream));
-          break;
-        case Types.FLOAT:
-          record.addField(idx, doubleCoder.decode(inStream).floatValue());
-          break;
-        case Types.BIGINT:
-          record.addField(idx, longCoder.decode(inStream));
-          break;
-        case Types.DECIMAL:
-          record.addField(idx, bigDecimalCoder.decode(inStream));
-          break;
-        case Types.VARCHAR:
-        case Types.CHAR:
-          record.addField(idx, stringCoder.decode(inStream));
-          break;
-        case Types.TIME:
-          GregorianCalendar calendar = new GregorianCalendar();
-          calendar.setTime(new Date(longCoder.decode(inStream)));
-          record.addField(idx, calendar);
-          break;
-        case Types.DATE:
-        case Types.TIMESTAMP:
-          record.addField(idx, new Date(longCoder.decode(inStream)));
-          break;
-        case Types.BOOLEAN:
-          record.addField(idx, byteCoder.decode(inStream) == 1);
-          break;
-
-        default:
-          throw new UnsupportedOperationException("Data type: "
-              + sqlRecordType.getFieldsType().get(idx)
-              + " not supported yet!");
-      }
-    }
-
-    record.setWindowStart(instantCoder.decode(inStream));
-    record.setWindowEnd(instantCoder.decode(inStream));
-
-    return record;
-  }
-
-  public BeamSqlRowType getSqlRecordType() {
-    return sqlRecordType;
-  }
-
-  @Override
-  public void verifyDeterministic()
-      throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
deleted file mode 100644
index 7584dad..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.values.BeamRecordTypeProvider;
-
-/**
- * Type provider for {@link BeamSqlRow} with SQL types.
- *
- * <p>Limited SQL types are supported now, visit
- * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
- * for more details.
- *
- */
-public class BeamSqlRowType extends BeamRecordTypeProvider {
-  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
-  static {
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
-  }
-
-  public List<Integer> fieldsType;
-
-  protected BeamSqlRowType(List<String> fieldsName) {
-    super(fieldsName);
-  }
-
-  public BeamSqlRowType(List<String> fieldsName, List<Integer> fieldsType) {
-    super(fieldsName);
-    this.fieldsType = fieldsType;
-  }
-
-  public static BeamSqlRowType create(List<String> fieldNames,
-      List<Integer> fieldTypes) {
-    return new BeamSqlRowType(fieldNames, fieldTypes);
-  }
-
-  @Override
-  public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
-    int fieldType = fieldsType.get(index);
-    Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
-    if (javaClazz == null) {
-      throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
-    }
-
-    if (!fieldValue.getClass().equals(javaClazz)) {
-      throw new IllegalArgumentException(
-          String.format("[%s](%s) doesn't match type [%s]",
-              fieldValue, fieldValue.getClass(), fieldType)
-      );
-    }
-  }
-
-  public List<Integer> getFieldsType() {
-    return fieldsType;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj != null && obj instanceof BeamSqlRowType) {
-      BeamSqlRowType ins = (BeamSqlRowType) obj;
-      return fieldsType.equals(ins.getFieldsType()) && getFieldsName().equals(ins.getFieldsName());
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public int hashCode() {
-    return 31 * getFieldsName().hashCode() + getFieldsType().hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
index c179935..b370d9d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.schema;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 
@@ -37,16 +38,16 @@ public interface BeamSqlTable {
    * create a {@code PCollection<BeamSqlRow>} from source.
    *
    */
-  PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline);
+  PCollection<BeamRecord> buildIOReader(Pipeline pipeline);
 
   /**
    * create a {@code IO.write()} instance to write to target.
    *
    */
-   PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter();
+   PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter();
 
   /**
    * Get the schema info of the table.
    */
-   BeamSqlRowType getRowType();
+   BeamSqlRecordType getRowType();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
index c769928..63c9720 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -23,6 +23,7 @@ import java.io.StringReader;
 import java.io.StringWriter;
 import java.math.BigDecimal;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.NlsString;
 import org.apache.commons.csv.CSVFormat;
@@ -34,11 +35,11 @@ import org.apache.commons.csv.CSVRecord;
  * Utility methods for working with {@code BeamTable}.
  */
 public final class BeamTableUtils {
-  public static BeamSqlRow csvLine2BeamSqlRow(
+  public static BeamRecord csvLine2BeamSqlRow(
       CSVFormat csvFormat,
       String line,
-      BeamSqlRowType beamSqlRowType) {
-    BeamSqlRow row = new BeamSqlRow(beamSqlRowType);
+      BeamSqlRecordType beamSqlRowType) {
+    BeamRecord row = new BeamRecord(beamSqlRowType);
     try (StringReader reader = new StringReader(line)) {
       CSVParser parser = csvFormat.parse(reader);
       CSVRecord rawRecord = parser.getRecords().get(0);
@@ -60,7 +61,7 @@ public final class BeamTableUtils {
     return row;
   }
 
-  public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) {
+  public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
     StringWriter writer = new StringWriter();
     try (CSVPrinter printer = csvFormat.print(writer)) {
       for (int i = 0; i < row.size(); i++) {
@@ -73,13 +74,14 @@ public final class BeamTableUtils {
     return writer.toString();
   }
 
-  public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) {
+  public static void addFieldWithAutoTypeCasting(BeamRecord row, int idx, Object rawObj) {
     if (rawObj == null) {
       row.addField(idx, null);
       return;
     }
 
-    SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx);
+    SqlTypeName columnType = CalciteUtils.getFieldType(BeamSqlRecordHelper.getSqlRecordType(row)
+        , idx);
     // auto-casting for numberics
     if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
         || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
index 2a50947..f137379 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -18,12 +18,12 @@
 package org.apache.beam.sdk.extensions.sql.schema.kafka;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.csv.CSVFormat;
@@ -34,45 +34,45 @@ import org.apache.commons.csv.CSVFormat;
  */
 public class BeamKafkaCSVTable extends BeamKafkaTable {
   private CSVFormat csvFormat;
-  public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+  public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
       List<String> topics) {
     this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
   }
 
-  public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+  public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
       List<String> topics, CSVFormat format) {
     super(beamSqlRowType, bootstrapServers, topics);
     this.csvFormat = format;
   }
 
   @Override
-  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
       getPTransformForInput() {
     return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
   }
 
   @Override
-  public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
+  public PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
       getPTransformForOutput() {
     return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
   }
 
   /**
-   * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSqlRow}.
+   * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamRecord}.
    *
    */
   public static class CsvRecorderDecoder
-      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> {
-    private BeamSqlRowType rowType;
+      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> {
+    private BeamSqlRecordType rowType;
     private CSVFormat format;
-    public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) {
+    public CsvRecorderDecoder(BeamSqlRecordType rowType, CSVFormat format) {
       this.rowType = rowType;
       this.format = format;
     }
 
     @Override
-    public PCollection<BeamSqlRow> expand(PCollection<KV<byte[], byte[]>> input) {
-      return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSqlRow>() {
+    public PCollection<BeamRecord> expand(PCollection<KV<byte[], byte[]>> input) {
+      return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamRecord>() {
         @ProcessElement
         public void processElement(ProcessContext c) {
           String rowInString = new String(c.element().getValue());
@@ -83,24 +83,24 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
   }
 
   /**
-   * A PTransform to convert {@link BeamSqlRow} to {@code KV<byte[], byte[]>}.
+   * A PTransform to convert {@link BeamRecord} to {@code KV<byte[], byte[]>}.
    *
    */
   public static class CsvRecorderEncoder
-      extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> {
-    private BeamSqlRowType rowType;
+      extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> {
+    private BeamSqlRecordType rowType;
     private CSVFormat format;
-    public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) {
+    public CsvRecorderEncoder(BeamSqlRecordType rowType, CSVFormat format) {
       this.rowType = rowType;
       this.format = format;
     }
 
     @Override
-    public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSqlRow> input) {
-      return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, KV<byte[], byte[]>>() {
+    public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamRecord> input) {
+      return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, KV<byte[], byte[]>>() {
         @ProcessElement
         public void processElement(ProcessContext c) {
-          BeamSqlRow in = c.element();
+          BeamRecord in = c.element();
           c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes()));
         }
       }));

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
index 2cc664f..fac57bf 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
@@ -26,10 +26,10 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -48,11 +48,11 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
   private List<String> topics;
   private Map<String, Object> configUpdates;
 
-  protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) {
+  protected BeamKafkaTable(BeamSqlRecordType beamSqlRowType) {
     super(beamSqlRowType);
   }
 
-  public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+  public BeamKafkaTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
       List<String> topics) {
     super(beamSqlRowType);
     this.bootstrapServers = bootstrapServers;
@@ -69,14 +69,14 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
     return BeamIOType.UNBOUNDED;
   }
 
-  public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
+  public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
       getPTransformForInput();
 
-  public abstract PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
+  public abstract PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
       getPTransformForOutput();
 
   @Override
-  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
     return PBegin.in(pipeline).apply("read",
             KafkaIO.<byte[], byte[]>read()
                 .withBootstrapServers(bootstrapServers)
@@ -89,13 +89,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
   }
 
   @Override
-  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
     checkArgument(topics != null && topics.size() == 1,
         "Only one topic can be acceptable as output.");
 
-    return new PTransform<PCollection<BeamSqlRow>, PDone>() {
+    return new PTransform<PCollection<BeamRecord>, PDone>() {
       @Override
-      public PDone expand(PCollection<BeamSqlRow> input) {
+      public PDone expand(PCollection<BeamRecord> input) {
         return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
             KafkaIO.<byte[], byte[]>write()
                 .withBootstrapServers(bootstrapServers)

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
index c44faab..0ec418c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
@@ -19,10 +19,10 @@
 package org.apache.beam.sdk.extensions.sql.schema.text;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -46,25 +46,25 @@ public class BeamTextCSVTable extends BeamTextTable {
   /**
    * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
    */
-  public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern)  {
+  public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern)  {
     this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
   }
 
-  public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern,
+  public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern,
       CSVFormat csvFormat) {
     super(beamSqlRowType, filePattern);
     this.csvFormat = csvFormat;
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
     return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
         .apply("parseCSVLine",
             new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
   }
 
   @Override
-  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
     return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
index 06109c3..ecb77e0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -19,12 +19,12 @@
 package org.apache.beam.sdk.extensions.sql.schema.text;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.csv.CSVFormat;
 
@@ -32,13 +32,13 @@ import org.apache.commons.csv.CSVFormat;
  * IOReader for {@code BeamTextCSVTable}.
  */
 public class BeamTextCSVTableIOReader
-    extends PTransform<PCollection<String>, PCollection<BeamSqlRow>>
+    extends PTransform<PCollection<String>, PCollection<BeamRecord>>
     implements Serializable {
   private String filePattern;
-  protected BeamSqlRowType beamSqlRowType;
+  protected BeamSqlRecordType beamSqlRowType;
   protected CSVFormat csvFormat;
 
-  public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern,
+  public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRowType, String filePattern,
       CSVFormat csvFormat) {
     this.filePattern = filePattern;
     this.beamSqlRowType = beamSqlRowType;
@@ -46,8 +46,8 @@ public class BeamTextCSVTableIOReader
   }
 
   @Override
-  public PCollection<BeamSqlRow> expand(PCollection<String> input) {
-    return input.apply(ParDo.of(new DoFn<String, BeamSqlRow>() {
+  public PCollection<BeamRecord> expand(PCollection<String> input) {
+    return input.apply(ParDo.of(new DoFn<String, BeamRecord>() {
           @ProcessElement
           public void processElement(ProcessContext ctx) {
             String str = ctx.element();

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
index 1684b37..c616973 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -19,13 +19,13 @@
 package org.apache.beam.sdk.extensions.sql.schema.text;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.commons.csv.CSVFormat;
@@ -33,24 +33,24 @@ import org.apache.commons.csv.CSVFormat;
 /**
  * IOWriter for {@code BeamTextCSVTable}.
  */
-public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone>
+public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone>
     implements Serializable {
   private String filePattern;
-  protected BeamSqlRowType beamSqlRowType;
+  protected BeamSqlRecordType beamSqlRowType;
   protected CSVFormat csvFormat;
 
-  public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern,
+  public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRowType, String filePattern,
       CSVFormat csvFormat) {
     this.filePattern = filePattern;
     this.beamSqlRowType = beamSqlRowType;
     this.csvFormat = csvFormat;
   }
 
-  @Override public PDone expand(PCollection<BeamSqlRow> input) {
-    return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, String>() {
+  @Override public PDone expand(PCollection<BeamRecord> input) {
+    return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, String>() {
 
       @ProcessElement public void processElement(ProcessContext ctx) {
-        BeamSqlRow row = ctx.element();
+        BeamRecord row = ctx.element();
         ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat));
       }
     })).apply(TextIO.write().to(filePattern));

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
index e85608d..4284366 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.schema.text;
 import java.io.Serializable;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 
 /**
  * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
 public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
   protected String filePattern;
 
-  protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) {
+  protected BeamTextTable(BeamSqlRecordType beamSqlRowType, String filePattern) {
     super(beamSqlRowType);
     this.filePattern = filePattern;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index e6ca18f..8501157 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.extensions.sql;
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
@@ -49,16 +49,16 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     runAggregationWithoutWindow(unboundedInput1);
   }
 
-  private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception {
+  private void runAggregationWithoutWindow(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
 
-    PCollection<BeamSqlRow> result =
+    PCollection<BeamRecord> result =
         input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
 
-    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"),
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
-    BeamSqlRow record = new BeamSqlRow(resultType);
+    BeamRecord record = new BeamRecord(resultType);
     record.addField("f_int2", 0);
     record.addField("size", 4L);
 
@@ -83,7 +83,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     runAggregationFunctions(unboundedInput1);
   }
 
-  private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{
+  private void runAggregationFunctions(PCollection<BeamRecord> input) throws Exception{
     String sql = "select f_int2, count(*) as size, "
         + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1,"
         + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2,"
@@ -94,11 +94,11 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         + "max(f_timestamp) as max6, min(f_timestamp) as min6 "
         + "FROM TABLE_A group by f_int2";
 
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testAggregationFunctions", BeamSql.query(sql));
 
-    BeamSqlRowType resultType = BeamSqlRowType.create(
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(
         Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
             "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
             "max5", "min5", "max6", "min6"),
@@ -108,7 +108,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
             Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
             Types.TIMESTAMP, Types.TIMESTAMP));
 
-    BeamSqlRow record = new BeamSqlRow(resultType);
+    BeamRecord record = new BeamRecord(resultType);
     record.addField("f_int2", 0);
     record.addField("size", 4L);
 
@@ -161,28 +161,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     runDistinct(unboundedInput1);
   }
 
-  private void runDistinct(PCollection<BeamSqlRow> input) throws Exception {
+  private void runDistinct(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
 
-    PCollection<BeamSqlRow> result =
+    PCollection<BeamRecord> result =
         input.apply("testDistinct", BeamSql.simpleQuery(sql));
 
-    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
-    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    BeamRecord record1 = new BeamRecord(resultType);
     record1.addField("f_int", 1);
     record1.addField("f_long", 1000L);
 
-    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int", 2);
     record2.addField("f_long", 2000L);
 
-    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    BeamRecord record3 = new BeamRecord(resultType);
     record3.addField("f_int", 3);
     record3.addField("f_long", 3000L);
 
-    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    BeamRecord record4 = new BeamRecord(resultType);
     record4.addField("f_int", 4);
     record4.addField("f_long", 4000L);
 
@@ -207,27 +207,27 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     runTumbleWindow(unboundedInput1);
   }
 
-  private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception {
+  private void runTumbleWindow(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT f_int2, COUNT(*) AS `size`,"
         + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
         + " FROM TABLE_A"
         + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testTumbleWindow", BeamSql.query(sql));
 
-    BeamSqlRowType resultType = BeamSqlRowType.create(
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
-    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    BeamRecord record1 = new BeamRecord(resultType);
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
     record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
     record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
     record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
 
-    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 1L);
     record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
@@ -255,40 +255,40 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     runHopWindow(unboundedInput1);
   }
 
-  private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception {
+  private void runHopWindow(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT f_int2, COUNT(*) AS `size`,"
         + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
         + " FROM PCOLLECTION"
         + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
-    PCollection<BeamSqlRow> result =
+    PCollection<BeamRecord> result =
         input.apply("testHopWindow", BeamSql.simpleQuery(sql));
 
-    BeamSqlRowType resultType = BeamSqlRowType.create(
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
-    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    BeamRecord record1 = new BeamRecord(resultType);
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
     record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00"));
     record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime()));
     record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
 
-    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 3L);
     record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
     record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
     record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
 
-    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    BeamRecord record3 = new BeamRecord(resultType);
     record3.addField("f_int2", 0);
     record3.addField("size", 1L);
     record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00"));
     record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
     record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime()));
 
-    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    BeamRecord record4 = new BeamRecord(resultType);
     record4.addField("f_int2", 0);
     record4.addField("size", 1L);
     record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
@@ -316,27 +316,27 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     runSessionWindow(unboundedInput1);
   }
 
-  private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception {
+  private void runSessionWindow(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT f_int2, COUNT(*) AS `size`,"
         + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
         + " FROM TABLE_A"
         + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testSessionWindow", BeamSql.query(sql));
 
-    BeamSqlRowType resultType = BeamSqlRowType.create(
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
-    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    BeamRecord record1 = new BeamRecord(resultType);
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
     record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03"));
     record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime()));
     record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime()));
 
-    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 1L);
     record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03"));
@@ -357,8 +357,8 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
 
     String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
         + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1)
         .apply("testWindowOnNonTimestampField", BeamSql.query(sql));
 
     pipeline.run().waitUntilFinish();
@@ -372,7 +372,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
 
     String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
 
-    PCollection<BeamSqlRow> result =
+    PCollection<BeamRecord> result =
         boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
 
     pipeline.run().waitUntilFinish();

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index 0c1ce1c..d09caf0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -25,12 +25,11 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
@@ -53,20 +52,20 @@ public class BeamSqlDslBase {
   @Rule
   public ExpectedException exceptions = ExpectedException.none();
 
-  public static BeamSqlRowType rowTypeInTableA;
-  public static List<BeamSqlRow> recordsInTableA;
+  public static BeamSqlRecordType rowTypeInTableA;
+  public static List<BeamRecord> recordsInTableA;
 
   //bounded PCollections
-  public PCollection<BeamSqlRow> boundedInput1;
-  public PCollection<BeamSqlRow> boundedInput2;
+  public PCollection<BeamRecord> boundedInput1;
+  public PCollection<BeamRecord> boundedInput2;
 
   //unbounded PCollections
-  public PCollection<BeamSqlRow> unboundedInput1;
-  public PCollection<BeamSqlRow> unboundedInput2;
+  public PCollection<BeamRecord> unboundedInput1;
+  public PCollection<BeamRecord> unboundedInput2;
 
   @BeforeClass
   public static void prepareClass() throws ParseException {
-    rowTypeInTableA = BeamSqlRowType.create(
+    rowTypeInTableA = BeamSqlRecordType.create(
         Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
             "f_timestamp", "f_int2", "f_decimal"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,
@@ -78,20 +77,20 @@ public class BeamSqlDslBase {
   @Before
   public void preparePCollections(){
     boundedInput1 = PBegin.in(pipeline).apply("boundedInput1",
-        Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
+        Create.of(recordsInTableA).withCoder(rowTypeInTableA.getRecordCoder()));
 
     boundedInput2 = PBegin.in(pipeline).apply("boundedInput2",
-        Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
+        Create.of(recordsInTableA.get(0)).withCoder(rowTypeInTableA.getRecordCoder()));
 
     unboundedInput1 = prepareUnboundedPCollection1();
     unboundedInput2 = prepareUnboundedPCollection2();
   }
 
-  private PCollection<BeamSqlRow> prepareUnboundedPCollection1() {
-    TestStream.Builder<BeamSqlRow> values = TestStream
-        .create(new BeamSqlRowCoder(rowTypeInTableA));
+  private PCollection<BeamRecord> prepareUnboundedPCollection1() {
+    TestStream.Builder<BeamRecord> values = TestStream
+        .create(rowTypeInTableA.getRecordCoder());
 
-    for (BeamSqlRow row : recordsInTableA) {
+    for (BeamRecord row : recordsInTableA) {
       values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
       values = values.addElements(row);
     }
@@ -99,21 +98,21 @@ public class BeamSqlDslBase {
     return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity());
   }
 
-  private PCollection<BeamSqlRow> prepareUnboundedPCollection2() {
-    TestStream.Builder<BeamSqlRow> values = TestStream
-        .create(new BeamSqlRowCoder(rowTypeInTableA));
+  private PCollection<BeamRecord> prepareUnboundedPCollection2() {
+    TestStream.Builder<BeamRecord> values = TestStream
+        .create(rowTypeInTableA.getRecordCoder());
 
-    BeamSqlRow row = recordsInTableA.get(0);
+    BeamRecord row = recordsInTableA.get(0);
     values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
     values = values.addElements(row);
 
     return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity());
   }
 
-  private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{
-    List<BeamSqlRow> rows = new ArrayList<>();
+  private static List<BeamRecord> prepareInputRowsInTableA() throws ParseException{
+    List<BeamRecord> rows = new ArrayList<>();
 
-    BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA);
+    BeamRecord row1 = new BeamRecord(rowTypeInTableA);
     row1.addField(0, 1);
     row1.addField(1, 1000L);
     row1.addField(2, Short.valueOf("1"));
@@ -126,7 +125,7 @@ public class BeamSqlDslBase {
     row1.addField(9, new BigDecimal(1));
     rows.add(row1);
 
-    BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA);
+    BeamRecord row2 = new BeamRecord(rowTypeInTableA);
     row2.addField(0, 2);
     row2.addField(1, 2000L);
     row2.addField(2, Short.valueOf("2"));
@@ -139,7 +138,7 @@ public class BeamSqlDslBase {
     row2.addField(9, new BigDecimal(2));
     rows.add(row2);
 
-    BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA);
+    BeamRecord row3 = new BeamRecord(rowTypeInTableA);
     row3.addField(0, 3);
     row3.addField(1, 3000L);
     row3.addField(2, Short.valueOf("3"));
@@ -152,7 +151,7 @@ public class BeamSqlDslBase {
     row3.addField(9, new BigDecimal(3));
     rows.add(row3);
 
-    BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA);
+    BeamRecord row4 = new BeamRecord(rowTypeInTableA);
     row4.addField(0, 4);
     row4.addField(1, 4000L);
     row4.addField(2, Short.valueOf("4"));

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
index 16b6426..e1d463b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
@@ -44,10 +44,10 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
     runSingleFilter(unboundedInput1);
   }
 
-  private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception {
+  private void runSingleFilter(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1";
 
-    PCollection<BeamSqlRow> result =
+    PCollection<BeamRecord> result =
         input.apply("testSingleFilter", BeamSql.simpleQuery(sql));
 
     PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
@@ -71,12 +71,12 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
     runCompositeFilter(unboundedInput1);
   }
 
-  private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception {
+  private void runCompositeFilter(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT * FROM TABLE_A"
         + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')";
 
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testCompositeFilter", BeamSql.query(sql));
 
     PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2));
@@ -100,11 +100,11 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
     runNoReturnFilter(unboundedInput1);
   }
 
-  private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception {
+  private void runNoReturnFilter(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT * FROM TABLE_A WHERE f_int < 1";
 
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testNoReturnFilter", BeamSql.query(sql));
 
     PAssert.that(result).empty();
@@ -120,8 +120,8 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
 
     String sql = "SELECT * FROM TABLE_B WHERE f_int < 1";
 
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1)
         .apply("testFromInvalidTableName1", BeamSql.query(sql));
 
     pipeline.run().waitUntilFinish();
@@ -135,7 +135,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
 
     String sql = "SELECT * FROM PCOLLECTION_NA";
 
-    PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
+    PCollection<BeamRecord> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
 
     pipeline.run().waitUntilFinish();
   }
@@ -148,7 +148,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
 
     String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0";
 
-    PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
+    PCollection<BeamRecord> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
 
     pipeline.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index d75af9b..d5d0a24 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -23,11 +23,11 @@ import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBo
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
@@ -41,8 +41,8 @@ public class BeamSqlDslJoinTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
-  private static final BeamSqlRowType SOURCE_RECORD_TYPE =
-      BeamSqlRowType.create(
+  private static final BeamSqlRecordType SOURCE_RECORD_TYPE =
+      BeamSqlRecordType.create(
           Arrays.asList(
               "order_id", "site_id", "price"
           ),
@@ -51,11 +51,10 @@ public class BeamSqlDslJoinTest {
           )
       );
 
-  private static final BeamSqlRowCoder SOURCE_CODER =
-      new BeamSqlRowCoder(SOURCE_RECORD_TYPE);
+  private static final BeamRecordCoder SOURCE_CODER = SOURCE_RECORD_TYPE.getRecordCoder();
 
-  private static final BeamSqlRowType RESULT_RECORD_TYPE =
-      BeamSqlRowType.create(
+  private static final BeamSqlRecordType RESULT_RECORD_TYPE =
+      BeamSqlRecordType.create(
           Arrays.asList(
           "order_id", "site_id", "price", "order_id0", "site_id0", "price0"
           ),
@@ -65,8 +64,7 @@ public class BeamSqlDslJoinTest {
           )
       );
 
-  private static final BeamSqlRowCoder RESULT_CODER =
-      new BeamSqlRowCoder(RESULT_RECORD_TYPE);
+  private static final BeamRecordCoder RESULT_CODER = RESULT_RECORD_TYPE.getRecordCoder();
 
   @Test
   public void testInnerJoin() throws Exception {
@@ -178,13 +176,13 @@ public class BeamSqlDslJoinTest {
     pipeline.run();
   }
 
-  private PCollection<BeamSqlRow> queryFromOrderTables(String sql) {
+  private PCollection<BeamRecord> queryFromOrderTables(String sql) {
     return PCollectionTuple
         .of(
-            new TupleTag<BeamSqlRow>("ORDER_DETAILS1"),
+            new TupleTag<BeamRecord>("ORDER_DETAILS1"),
             ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER)
         )
-        .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"),
+        .and(new TupleTag<BeamRecord>("ORDER_DETAILS2"),
             ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER)
         ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
index 6468011..ddb90d5 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.extensions.sql;
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
@@ -47,10 +47,10 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
     runSelectAll(unboundedInput2);
   }
 
-  private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception {
+  private void runSelectAll(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT * FROM PCOLLECTION";
 
-    PCollection<BeamSqlRow> result =
+    PCollection<BeamRecord> result =
         input.apply("testSelectAll", BeamSql.simpleQuery(sql));
 
     PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
@@ -74,17 +74,17 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
     runPartialFields(unboundedInput2);
   }
 
-  private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception {
+  private void runPartialFields(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT f_int, f_long FROM TABLE_A";
 
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testPartialFields", BeamSql.query(sql));
 
-    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
-    BeamSqlRow record = new BeamSqlRow(resultType);
+    BeamRecord record = new BeamRecord(resultType);
     record.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
     record.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
 
@@ -109,29 +109,29 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
     runPartialFieldsInMultipleRow(unboundedInput1);
   }
 
-  private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception {
+  private void runPartialFieldsInMultipleRow(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT f_int, f_long FROM TABLE_A";
 
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
 
-    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
-    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    BeamRecord record1 = new BeamRecord(resultType);
     record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
     record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
 
-    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
     record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
 
-    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    BeamRecord record3 = new BeamRecord(resultType);
     record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
     record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
 
-    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    BeamRecord record4 = new BeamRecord(resultType);
     record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
     record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
 
@@ -156,29 +156,29 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
     runPartialFieldsInRows(unboundedInput1);
   }
 
-  private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception {
+  private void runPartialFieldsInRows(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT f_int, f_long FROM TABLE_A";
 
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testPartialFieldsInRows", BeamSql.query(sql));
 
-    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
-    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    BeamRecord record1 = new BeamRecord(resultType);
     record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
     record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
 
-    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
     record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
 
-    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    BeamRecord record3 = new BeamRecord(resultType);
     record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
     record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
 
-    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    BeamRecord record4 = new BeamRecord(resultType);
     record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
     record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
 
@@ -203,17 +203,17 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
     runLiteralField(unboundedInput2);
   }
 
-  public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception {
+  public void runLiteralField(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT 1 as literal_field FROM TABLE_A";
 
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testLiteralField", BeamSql.query(sql));
 
-    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"),
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
         Arrays.asList(Types.INTEGER));
 
-    BeamSqlRow record = new BeamSqlRow(resultType);
+    BeamRecord record = new BeamRecord(resultType);
     record.addField("literal_field", 1);
 
     PAssert.that(result).containsInAnyOrder(record);
@@ -229,8 +229,8 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
 
     String sql = "SELECT f_int_na FROM TABLE_A";
 
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+    PCollection<BeamRecord> result =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1)
         .apply("testProjectUnknownField", BeamSql.query(sql));
 
     pipeline.run().waitUntilFinish();