You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/04 17:09:46 UTC

[3/7] beam git commit: refactor BeamRecord, BeamRecordType, BeamSqlRecordType, BeamRecordCoder

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 46cab09..e3c6aec 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -20,11 +20,11 @@ package org.apache.beam.sdk.extensions.sql;
 import java.sql.Types;
 import java.util.Arrays;
 import java.util.Iterator;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
@@ -39,24 +39,24 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
    */
   @Test
   public void testUdaf() throws Exception {
-    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"),
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"),
         Arrays.asList(Types.INTEGER, Types.INTEGER));
 
-    BeamSqlRow record = new BeamSqlRow(resultType);
+    BeamRecord record = new BeamRecord(resultType);
     record.addField("f_int2", 0);
     record.addField("squaresum", 30);
 
     String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`"
         + " FROM PCOLLECTION GROUP BY f_int2";
-    PCollection<BeamSqlRow> result1 =
+    PCollection<BeamRecord> result1 =
         boundedInput1.apply("testUdaf1",
             BeamSql.simpleQuery(sql1).withUdaf("squaresum1", SquareSum.class));
     PAssert.that(result1).containsInAnyOrder(record);
 
     String sql2 = "SELECT f_int2, squaresum2(f_int) AS `squaresum`"
         + " FROM PCOLLECTION GROUP BY f_int2";
-    PCollection<BeamSqlRow> result2 =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
+    PCollection<BeamRecord> result2 =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1)
         .apply("testUdaf2",
             BeamSql.query(sql2).withUdaf("squaresum2", SquareSum.class));
     PAssert.that(result2).containsInAnyOrder(record);
@@ -69,22 +69,22 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
    */
   @Test
   public void testUdf() throws Exception{
-    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"),
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"),
         Arrays.asList(Types.INTEGER, Types.INTEGER));
 
-    BeamSqlRow record = new BeamSqlRow(resultType);
+    BeamRecord record = new BeamRecord(resultType);
     record.addField("f_int", 2);
     record.addField("cubicvalue", 8);
 
     String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
-    PCollection<BeamSqlRow> result1 =
+    PCollection<BeamRecord> result1 =
         boundedInput1.apply("testUdf1",
             BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class));
     PAssert.that(result1).containsInAnyOrder(record);
 
     String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
-    PCollection<BeamSqlRow> result2 =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
+    PCollection<BeamRecord> result2 =
+        PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1)
         .apply("testUdf2",
             BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class));
     PAssert.that(result2).containsInAnyOrder(record);

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 9995b0a..63b6ca8 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -21,9 +21,9 @@ package org.apache.beam.sdk.extensions.sql;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.BeamRecord;
 
 /**
  * Test utilities.
@@ -32,7 +32,7 @@ public class TestUtils {
   /**
    * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}.
    */
-  public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> {
+  public static class BeamSqlRow2StringDoFn extends DoFn<BeamRecord, String> {
     @ProcessElement
     public void processElement(ProcessContext ctx) {
       ctx.output(ctx.element().valueInString());
@@ -42,9 +42,9 @@ public class TestUtils {
   /**
    * Convert list of {@code BeamSqlRow} to list of {@code String}.
    */
-  public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) {
+  public static List<String> beamSqlRows2Strings(List<BeamRecord> rows) {
     List<String> strs = new ArrayList<>();
-    for (BeamSqlRow row : rows) {
+    for (BeamRecord row : rows) {
       strs.add(row.valueInString());
     }
 
@@ -69,8 +69,8 @@ public class TestUtils {
    * {@code}
    */
   public static class RowsBuilder {
-    private BeamSqlRowType type;
-    private List<BeamSqlRow> rows = new ArrayList<>();
+    private BeamSqlRecordType type;
+    private List<BeamRecord> rows = new ArrayList<>();
 
     /**
      * Create a RowsBuilder with the specified row type info.
@@ -86,7 +86,7 @@ public class TestUtils {
      * @args pairs of column type and column names.
      */
     public static RowsBuilder of(final Object... args) {
-      BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args);
+      BeamSqlRecordType beamSQLRowType = buildBeamSqlRowType(args);
       RowsBuilder builder = new RowsBuilder();
       builder.type = beamSQLRowType;
 
@@ -103,7 +103,7 @@ public class TestUtils {
      * )}</pre>
      * @beamSQLRowType the record type.
      */
-    public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) {
+    public static RowsBuilder of(final BeamSqlRecordType beamSQLRowType) {
       RowsBuilder builder = new RowsBuilder();
       builder.type = beamSQLRowType;
 
@@ -130,7 +130,7 @@ public class TestUtils {
       return this;
     }
 
-    public List<BeamSqlRow> getRows() {
+    public List<BeamRecord> getRows() {
       return rows;
     }
 
@@ -153,7 +153,7 @@ public class TestUtils {
    *   )
    * }</pre>
    */
-  public static BeamSqlRowType buildBeamSqlRowType(Object... args) {
+  public static BeamSqlRecordType buildBeamSqlRowType(Object... args) {
     List<Integer> types = new ArrayList<>();
     List<String> names = new ArrayList<>();
 
@@ -162,7 +162,7 @@ public class TestUtils {
       names.add((String) args[i + 1]);
     }
 
-    return BeamSqlRowType.create(names, types);
+    return BeamSqlRecordType.create(names, types);
   }
 
   /**
@@ -179,12 +179,12 @@ public class TestUtils {
    *   )
    * }</pre>
    */
-  public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) {
-    List<BeamSqlRow> rows = new ArrayList<>();
+  public static List<BeamRecord> buildRows(BeamSqlRecordType type, List args) {
+    List<BeamRecord> rows = new ArrayList<>();
     int fieldCount = type.size();
 
     for (int i = 0; i < args.size(); i += fieldCount) {
-      BeamSqlRow row = new BeamSqlRow(type);
+      BeamRecord row = new BeamRecord(type);
       for (int j = 0; j < fieldCount; j++) {
         row.addField(j, args.get(i + j));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index 388c556..4da7790 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@@ -57,8 +57,8 @@ public class BeamSqlFnExecutorTestBase {
       RelDataTypeSystem.DEFAULT);
   public static RelDataType relDataType;
 
-  public static BeamSqlRowType beamRowType;
-  public static BeamSqlRow record;
+  public static BeamSqlRecordType beamRowType;
+  public static BeamRecord record;
 
   public static RelBuilder relBuilder;
 
@@ -71,7 +71,7 @@ public class BeamSqlFnExecutorTestBase {
         .add("order_time", SqlTypeName.BIGINT).build();
 
     beamRowType = CalciteUtils.toBeamRowType(relDataType);
-    record = new BeamSqlRow(beamRowType);
+    record = new BeamRecord(beamRowType);
 
     record.addField(0, 1234567L);
     record.addField(1, 0);

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
index 5a3f65d..a51cc30 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
@@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -77,7 +77,7 @@ public class BeamIntersectRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -100,7 +100,7 @@ public class BeamIntersectRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).satisfies(new CheckSize(3));
 
     PAssert.that(rows).containsInAnyOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
index c4f6350..dde1540 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -77,7 +77,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
         + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.INTEGER, "order_id",
@@ -102,7 +102,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     pipeline.enableAbandonedNodeEnforcement(false);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
@@ -130,7 +130,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.INTEGER, "order_id",
@@ -157,7 +157,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
           Types.INTEGER, "order_id",

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 1dbd8b4..28ad99c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -26,10 +26,10 @@ import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.junit.BeforeClass;
@@ -98,7 +98,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -124,7 +124,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -150,7 +150,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
@@ -192,7 +192,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         + " on "
         + " o1.order_id=o2.order_id"
         ;
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
index 5e5e416..a5a2e85 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -25,10 +25,10 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
 import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.junit.BeforeClass;
@@ -88,7 +88,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -121,7 +121,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
     // 2, 2 | 2, 5
     // 3, 3 | NULL, NULL
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -151,7 +151,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -181,7 +181,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
         + " o1.order_id1=o2.order_id"
         ;
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
     rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
index 9149dd4..425e554 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
@@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -78,7 +78,7 @@ public class BeamMinusRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -100,7 +100,7 @@ public class BeamMinusRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).satisfies(new CheckSize(2));
 
     PAssert.that(rows).containsInAnyOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
index 36538c0..4de493a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
@@ -25,11 +25,11 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -71,7 +71,7 @@ public class BeamSetOperatorRelBaseTest {
         + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     // compare valueInString to ignore the windowStart & windowEnd
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
index 15e3b89..f033fa0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
@@ -24,9 +24,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Before;
 import org.junit.Rule;
@@ -78,7 +78,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 4";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
         Types.BIGINT, "order_id",
         Types.INTEGER, "site_id",
@@ -117,7 +117,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -155,7 +155,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -178,7 +178,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -201,7 +201,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 11";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
index c232b30..7cc52da 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
@@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -63,7 +63,7 @@ public class BeamUnionRelTest {
         + " order_id, site_id, price "
         + "FROM ORDER_DETAILS ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -86,7 +86,7 @@ public class BeamUnionRelTest {
         + " SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
index e5fa864..ff31e55 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
@@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
 import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -60,7 +60,7 @@ public class BeamValuesRelTest {
   public void testValues() throws Exception {
     String sql = "insert into string_table(name, description) values "
         + "('hello', 'world'), ('james', 'bond')";
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.VARCHAR, "name",
@@ -76,7 +76,7 @@ public class BeamValuesRelTest {
   @Test
   public void testValues_castInt() throws Exception {
     String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.INTEGER, "c0",
@@ -91,7 +91,7 @@ public class BeamValuesRelTest {
   @Test
   public void testValues_onlySelect() throws Exception {
     String sql = "select 1, '1'";
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.INTEGER, "EXPR$0",

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
index 8cdf2cd..7407a76 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
@@ -18,21 +18,21 @@
 
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.junit.Assert;
 
 /**
  * Utility class to check size of BeamSQLRow iterable.
  */
-public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
+public class CheckSize implements SerializableFunction<Iterable<BeamRecord>, Void> {
   private int size;
   public CheckSize(int size) {
     this.size = size;
   }
-  @Override public Void apply(Iterable<BeamSqlRow> input) {
+  @Override public Void apply(Iterable<BeamRecord> input) {
     int count = 0;
-    for (BeamSqlRow row : input) {
+    for (BeamRecord row : input) {
       count++;
     }
     Assert.assertEquals(size, count);

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index ffc6833..b58a17f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -32,11 +32,10 @@ import java.util.TimeZone;
 import org.apache.beam.sdk.extensions.sql.BeamSql;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.util.Pair;
 import org.junit.Rule;
@@ -62,8 +61,8 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
-  protected PCollection<BeamSqlRow> getTestPCollection() {
-    BeamSqlRowType type = BeamSqlRowType.create(
+  protected PCollection<BeamRecord> getTestPCollection() {
+    BeamSqlRecordType type = BeamSqlRecordType.create(
         Arrays.asList("ts", "c_tinyint", "c_smallint",
             "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
             "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
@@ -89,7 +88,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
               9223372036854775807L
           )
           .buildIOReader(pipeline)
-          .setCoder(new BeamSqlRowCoder(type));
+          .setCoder(type.getRecordCoder());
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -140,7 +139,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
      * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result.
      */
     public void buildRunAndCheck() {
-      PCollection<BeamSqlRow> inputCollection = getTestPCollection();
+      PCollection<BeamRecord> inputCollection = getTestPCollection();
       System.out.println("SQL:>\n" + getSql());
       try {
         List<String> names = new ArrayList<>();
@@ -153,10 +152,10 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
           values.add(pair.getValue());
         }
 
-        PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
+        PCollection<BeamRecord> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
         PAssert.that(rows).containsInAnyOrder(
             TestUtils.RowsBuilder
-                .of(BeamSqlRowType.create(names, types))
+                .of(BeamSqlRecordType.create(names, types))
                 .addRows(values)
                 .getRows()
         );

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
index 14de5b6..3569e31 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -22,9 +22,8 @@ import java.math.BigDecimal;
 import java.sql.Types;
 import java.util.Arrays;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Test;
 
@@ -282,8 +281,8 @@ public class BeamSqlComparisonOperatorsIntegrationTest
     checker.buildRunAndCheck();
   }
 
-  @Override protected PCollection<BeamSqlRow> getTestPCollection() {
-    BeamSqlRowType type = BeamSqlRowType.create(
+  @Override protected PCollection<BeamRecord> getTestPCollection() {
+    BeamSqlRecordType type = BeamSqlRecordType.create(
         Arrays.asList(
             "c_tinyint_0", "c_tinyint_1", "c_tinyint_2",
             "c_smallint_0", "c_smallint_1", "c_smallint_2",
@@ -322,7 +321,7 @@ public class BeamSqlComparisonOperatorsIntegrationTest
               false, true
           )
           .buildIOReader(pipeline)
-          .setCoder(new BeamSqlRowCoder(type));
+          .setCoder(type.getRecordCoder());
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
index 181c991..cda6a3c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertTrue;
 import java.util.Date;
 import java.util.Iterator;
 import org.apache.beam.sdk.extensions.sql.BeamSql;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Test;
 
@@ -63,17 +63,17 @@ public class BeamSqlDateFunctionsIntegrationTest
         + "CURRENT_TIMESTAMP as c3"
         + " FROM PCOLLECTION"
         ;
-    PCollection<BeamSqlRow> rows = getTestPCollection().apply(
+    PCollection<BeamRecord> rows = getTestPCollection().apply(
         BeamSql.simpleQuery(sql));
     PAssert.that(rows).satisfies(new Checker());
     pipeline.run();
   }
 
-  private static class Checker implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
-    @Override public Void apply(Iterable<BeamSqlRow> input) {
-      Iterator<BeamSqlRow> iter = input.iterator();
+  private static class Checker implements SerializableFunction<Iterable<BeamRecord>, Void> {
+    @Override public Void apply(Iterable<BeamRecord> input) {
+      Iterator<BeamRecord> iter = input.iterator();
       assertTrue(iter.hasNext());
-      BeamSqlRow row = iter.next();
+      BeamRecord row = iter.next();
         // LOCALTIME
       Date date = new Date();
       assertTrue(date.getTime() - row.getGregorianCalendar(0).getTime().getTime() < 1000);

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
index c7c26eb..073ca52 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
@@ -26,12 +26,12 @@ import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -41,11 +41,11 @@ import org.apache.beam.sdk.values.PDone;
  */
 public class MockedBoundedTable extends MockedTable {
   /** rows written to this table. */
-  private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
+  private static final ConcurrentLinkedQueue<BeamRecord> CONTENT = new ConcurrentLinkedQueue<>();
   /** rows flow out from this table. */
-  private final List<BeamSqlRow> rows = new ArrayList<>();
+  private final List<BeamRecord> rows = new ArrayList<>();
 
-  public MockedBoundedTable(BeamSqlRowType beamSqlRowType) {
+  public MockedBoundedTable(BeamSqlRecordType beamSqlRowType) {
     super(beamSqlRowType);
   }
 
@@ -69,7 +69,7 @@ public class MockedBoundedTable extends MockedTable {
   /**
    * Build a mocked bounded table with the specified type.
    */
-  public static MockedBoundedTable of(final BeamSqlRowType type) {
+  public static MockedBoundedTable of(final BeamSqlRecordType type) {
     return new MockedBoundedTable(type);
   }
 
@@ -88,7 +88,7 @@ public class MockedBoundedTable extends MockedTable {
    * }</pre>
    */
   public MockedBoundedTable addRows(Object... args) {
-    List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
+    List<BeamRecord> rows = buildRows(getRowType(), Arrays.asList(args));
     this.rows.addAll(rows);
     return this;
   }
@@ -99,12 +99,12 @@ public class MockedBoundedTable extends MockedTable {
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
     return PBegin.in(pipeline).apply(
         "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows));
   }
 
-  @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+  @Override public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
     return new OutputStore();
   }
 
@@ -112,11 +112,11 @@ public class MockedBoundedTable extends MockedTable {
    * Keep output in {@code CONTENT} for validation.
    *
    */
-  public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> {
+  public static class OutputStore extends PTransform<PCollection<BeamRecord>, PDone> {
 
     @Override
-    public PDone expand(PCollection<BeamSqlRow> input) {
-      input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() {
+    public PDone expand(PCollection<BeamRecord> input) {
+      input.apply(ParDo.of(new DoFn<BeamRecord, Void>() {
         @ProcessElement
         public void processElement(ProcessContext c) {
           CONTENT.add(c.element());

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
index 6017ee7..59fc6e1 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
@@ -20,9 +20,9 @@ package org.apache.beam.sdk.extensions.sql.mock;
 
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 
@@ -31,12 +31,12 @@ import org.apache.beam.sdk.values.PDone;
  */
 public abstract class MockedTable extends BaseBeamTable {
   public static final AtomicInteger COUNTER = new AtomicInteger();
-  public MockedTable(BeamSqlRowType beamSqlRowType) {
+  public MockedTable(BeamSqlRecordType beamSqlRowType) {
     super(beamSqlRowType);
   }
 
   @Override
-  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
     throw new UnsupportedOperationException("buildIOWriter unsupported!");
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
index f9ea2ac..6194264 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
@@ -24,10 +24,9 @@ import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.calcite.util.Pair;
@@ -39,10 +38,10 @@ import org.joda.time.Instant;
  */
 public class MockedUnboundedTable extends MockedTable {
   /** rows flow out from this table with the specified watermark instant. */
-  private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>();
+  private final List<Pair<Duration, List<BeamRecord>>> timestampedRows = new ArrayList<>();
   /** specify the index of column in the row which stands for the event time field. */
   private int timestampField;
-  private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) {
+  private MockedUnboundedTable(BeamSqlRecordType beamSqlRowType) {
     super(beamSqlRowType);
   }
 
@@ -83,7 +82,7 @@ public class MockedUnboundedTable extends MockedTable {
    * }</pre>
    */
   public MockedUnboundedTable addRows(Duration duration, Object... args) {
-    List<BeamSqlRow> rows = TestUtils.buildRows(getRowType(), Arrays.asList(args));
+    List<BeamRecord> rows = TestUtils.buildRows(getRowType(), Arrays.asList(args));
     // record the watermark + rows
     this.timestampedRows.add(Pair.of(duration, rows));
     return this;
@@ -93,11 +92,10 @@ public class MockedUnboundedTable extends MockedTable {
     return BeamIOType.UNBOUNDED;
   }
 
-  @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
-    TestStream.Builder<BeamSqlRow> values = TestStream.create(
-        new BeamSqlRowCoder(beamSqlRowType));
+  @Override public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
+    TestStream.Builder<BeamRecord> values = TestStream.create(beamSqlRowType.getRecordCoder());
 
-    for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
+    for (Pair<Duration, List<BeamRecord>> pair : timestampedRows) {
       values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
       for (int i = 0; i < pair.getValue().size(); i++) {
         values = values.addElements(TimestampedValue.of(pair.getValue().get(i),

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
index ddff819..08f98c3 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
@@ -21,8 +21,10 @@ package org.apache.beam.sdk.extensions.sql.schema;
 import java.math.BigDecimal;
 import java.util.Date;
 import java.util.GregorianCalendar;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -57,10 +59,10 @@ public class BeamSqlRowCoderTest {
       }
     };
 
-    BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(
+    BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(
         protoRowType.apply(new JavaTypeFactoryImpl(
             RelDataTypeSystem.DEFAULT)));
-    BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
+    BeamRecord row = new BeamRecord(beamSQLRowType);
     row.addField("col_tinyint", Byte.valueOf("1"));
     row.addField("col_smallint", Short.valueOf("1"));
     row.addField("col_integer", 1);
@@ -76,7 +78,7 @@ public class BeamSqlRowCoderTest {
     row.addField("col_boolean", true);
 
 
-    BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRowType);
+    BeamRecordCoder coder = beamSQLRowType.getRecordCoder();
     CoderProperties.coderDecodeEncodeEqual(coder, row);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
index 05af36c..2fc013d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
@@ -21,13 +21,13 @@ package org.apache.beam.sdk.extensions.sql.schema.kafka;
 import java.io.Serializable;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.rel.type.RelDataType;
@@ -45,8 +45,8 @@ import org.junit.Test;
 public class BeamKafkaCSVTableTest {
   @Rule
   public TestPipeline pipeline = TestPipeline.create();
-  public static BeamSqlRow row1 = new BeamSqlRow(genRowType());
-  public static BeamSqlRow row2 = new BeamSqlRow(genRowType());
+  public static BeamRecord row1 = new BeamRecord(genRowType());
+  public static BeamRecord row2 = new BeamRecord(genRowType());
 
   @BeforeClass
   public static void setUp() {
@@ -60,7 +60,7 @@ public class BeamKafkaCSVTableTest {
   }
 
   @Test public void testCsvRecorderDecoder() throws Exception {
-    PCollection<BeamSqlRow> result = pipeline
+    PCollection<BeamRecord> result = pipeline
         .apply(
             Create.of("1,\"1\",1.0", "2,2,2.0")
         )
@@ -75,7 +75,7 @@ public class BeamKafkaCSVTableTest {
   }
 
   @Test public void testCsvRecorderEncoder() throws Exception {
-    PCollection<BeamSqlRow> result = pipeline
+    PCollection<BeamRecord> result = pipeline
         .apply(
             Create.of(row1, row2)
         )
@@ -90,7 +90,7 @@ public class BeamKafkaCSVTableTest {
     pipeline.run();
   }
 
-  private static BeamSqlRowType genRowType() {
+  private static BeamSqlRecordType genRowType() {
     return CalciteUtils.toBeamRowType(new RelProtoDataType() {
 
       @Override public RelDataType apply(RelDataTypeFactory a0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
index 79e3d6d..4a39f7c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
@@ -33,10 +33,10 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -69,7 +69,7 @@ public class BeamTextCSVTableTest {
   private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" };
 
   private static List<Object[]> testData = Arrays.asList(data1, data2);
-  private static List<BeamSqlRow> testDataRows = new ArrayList<BeamSqlRow>() {{
+  private static List<BeamRecord> testDataRows = new ArrayList<BeamRecord>() {{
     for (Object[] data : testData) {
       add(buildRow(data));
     }
@@ -80,7 +80,7 @@ public class BeamTextCSVTableTest {
   private static File writerTargetFile;
 
   @Test public void testBuildIOReader() {
-    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
+    PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
         readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
     PAssert.that(rows).containsInAnyOrder(testDataRows);
     pipeline.run();
@@ -93,7 +93,7 @@ public class BeamTextCSVTableTest {
             .buildIOWriter());
     pipeline.run();
 
-    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
+    PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
         writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
 
     // confirm the two reads match
@@ -166,11 +166,11 @@ public class BeamTextCSVTableTest {
         .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build();
   }
 
-  private static BeamSqlRowType buildBeamSqlRowType() {
+  private static BeamSqlRecordType buildBeamSqlRowType() {
     return CalciteUtils.toBeamRowType(buildRelDataType());
   }
 
-  private static BeamSqlRow buildRow(Object[] data) {
-    return new BeamSqlRow(buildBeamSqlRowType(), Arrays.asList(data));
+  private static BeamRecord buildRow(Object[] data) {
+    return new BeamRecord(buildBeamSqlRowType(), Arrays.asList(data));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
index 821abc9..dca2ad7 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
@@ -21,14 +21,13 @@ import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
@@ -36,6 +35,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -63,14 +63,14 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
 
   private List<AggregateCall> aggCalls;
 
-  private BeamSqlRowType keyType;
-  private BeamSqlRowType aggPartType;
-  private BeamSqlRowType outputType;
+  private BeamSqlRecordType keyType;
+  private BeamSqlRecordType aggPartType;
+  private BeamSqlRecordType outputType;
 
-  private BeamSqlRowCoder inRecordCoder;
-  private BeamSqlRowCoder keyCoder;
-  private BeamSqlRowCoder aggCoder;
-  private BeamSqlRowCoder outRecordCoder;
+  private BeamRecordCoder inRecordCoder;
+  private BeamRecordCoder keyCoder;
+  private BeamRecordCoder aggCoder;
+  private BeamRecordCoder outRecordCoder;
 
   /**
    * This step equals to below query.
@@ -99,28 +99,28 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
   public void testCountPerElementBasic() throws ParseException {
     setupEnvironment();
 
-    PCollection<BeamSqlRow> input = p.apply(Create.of(inputRows));
+    PCollection<BeamRecord> input = p.apply(Create.of(inputRows));
 
     //1. extract fields in group-by key part
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = input.apply("exGroupBy",
+    PCollection<KV<BeamRecord, BeamRecord>> exGroupByStream = input.apply("exGroupBy",
         WithKeys
             .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))))
-        .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, inRecordCoder));
+        .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, inRecordCoder));
 
     //2. apply a GroupByKey.
-    PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream
-        .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create())
-        .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder,
-            IterableCoder.<BeamSqlRow>of(inRecordCoder)));
+    PCollection<KV<BeamRecord, Iterable<BeamRecord>>> groupedStream = exGroupByStream
+        .apply("groupBy", GroupByKey.<BeamRecord, BeamRecord>create())
+        .setCoder(KvCoder.<BeamRecord, Iterable<BeamRecord>>of(keyCoder,
+            IterableCoder.<BeamRecord>of(inRecordCoder)));
 
     //3. run aggregation functions
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation",
-        Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues(
+    PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = groupedStream.apply("aggregation",
+        Combine.<BeamRecord, BeamRecord, BeamRecord>groupedValues(
             new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType)))
-        .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder));
+        .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, aggCoder));
 
     //4. flat KV to a single record
-    PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord",
+    PCollection<BeamRecord> mergedStream = aggregatedStream.apply("mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1)));
     mergedStream.setCoder(outRecordCoder);
 
@@ -332,10 +332,10 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
    * Coders used in aggregation steps.
    */
   private void prepareTypeAndCoder() {
-    inRecordCoder = new BeamSqlRowCoder(inputRowType);
+    inRecordCoder = inputRowType.getRecordCoder();
 
     keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
-    keyCoder = new BeamSqlRowCoder(keyType);
+    keyCoder = keyType.getRecordCoder();
 
     aggPartType = initTypeOfSqlRow(
         Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
@@ -360,35 +360,35 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
             KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
             KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
             ));
-    aggCoder = new BeamSqlRowCoder(aggPartType);
+    aggCoder = aggPartType.getRecordCoder();
 
     outputType = prepareFinalRowType();
-    outRecordCoder = new BeamSqlRowCoder(outputType);
+    outRecordCoder = outputType.getRecordCoder();
   }
 
   /**
    * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
    */
-  private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationGroupByKeyFn() {
+  private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationGroupByKeyFn() {
     return Arrays.asList(
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
             inputRows.get(0)),
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
+        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
             inputRows.get(1)),
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
+        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
             inputRows.get(2)),
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
+        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
             inputRows.get(3)));
   }
 
   /**
    * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
    */
-  private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationCombineFn()
+  private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationCombineFn()
       throws ParseException {
     return Arrays.asList(
-            KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
-                new BeamSqlRow(aggPartType, Arrays.<Object>asList(
+            KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+                new BeamRecord(aggPartType, Arrays.<Object>asList(
                     4L,
                     10000L, 2500L, 4000L, 1000L,
                     (short) 10, (short) 2, (short) 4, (short) 1,
@@ -404,7 +404,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
   /**
    * Row type of final output row.
    */
-  private BeamSqlRowType prepareFinalRowType() {
+  private BeamSqlRecordType prepareFinalRowType() {
     FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
     List<KV<String, SqlTypeName>> columnMetadata =
         Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
@@ -438,8 +438,8 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
   /**
    * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}.
    */
-  private BeamSqlRow prepareResultOfMergeAggregationRecord() throws ParseException {
-    return new BeamSqlRow(outputType, Arrays.<Object>asList(
+  private BeamRecord prepareResultOfMergeAggregationRecord() throws ParseException {
+    return new BeamRecord(outputType, Arrays.<Object>asList(
         1, 4L,
         10000L, 2500L, 4000L, 1000L,
         (short) 10, (short) 2, (short) 4, (short) 1,

http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
index af7ec23..e31463b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
@@ -24,8 +24,8 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -38,8 +38,8 @@ import org.junit.BeforeClass;
 public class BeamTransformBaseTest {
   public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
-  public static BeamSqlRowType inputRowType;
-  public static List<BeamSqlRow> inputRows;
+  public static BeamSqlRecordType inputRowType;
+  public static List<BeamRecord> inputRows;
 
   @BeforeClass
   public static void prepareInput() throws NumberFormatException, ParseException{
@@ -68,7 +68,7 @@ public class BeamTransformBaseTest {
   /**
    * create a {@code BeamSqlRowType} for given column metadata.
    */
-  public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
+  public static BeamSqlRecordType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
     FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
     for (KV<String, SqlTypeName> cm : columnMetadata) {
       builder.add(cm.getKey(), cm.getValue());
@@ -79,7 +79,7 @@ public class BeamTransformBaseTest {
   /**
    * Create an empty row with given column metadata.
    */
-  public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) {
+  public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) {
     return initBeamSqlRow(columnMetadata, Arrays.asList());
   }
 
@@ -87,11 +87,11 @@ public class BeamTransformBaseTest {
    * Create a row with given column metadata, and values for each column.
    *
    */
-  public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
+  public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
       List<Object> rowValues){
-    BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata);
+    BeamSqlRecordType rowType = initTypeOfSqlRow(columnMetadata);
 
-    return new BeamSqlRow(rowType, rowValues);
+    return new BeamRecord(rowType, rowValues);
   }
 
 }