You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/15 18:41:57 UTC

[2/5] beam git commit: [BEAM-2740] Hide BeamSqlEnv.

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
index 08678d1..456662f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
@@ -40,15 +40,13 @@ public class BeamSqlApiSurfaceTest {
     final Set<String> allowed =
         ImmutableSet.of(
             "org.apache.beam",
-            "org.joda.time",
-            "org.apache.commons.csv");
+            "org.joda.time");
 
     ApiSurface surface = ApiSurface
-        .ofClass(BeamSqlCli.class)
-        .includingClass(BeamSql.class)
-        .includingClass(BeamSqlEnv.class)
-        .includingPackage("org.apache.beam.sdk.extensions.sql.schema",
-            getClass().getClassLoader())
+        .ofClass(BeamSql.class)
+        .includingClass(BeamSqlUdf.class)
+        .includingClass(BeamRecordSqlType.class)
+        .includingClass(BeamSqlRecordHelper.class)
         .pruningPrefix("java")
         .pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*Test")
         .pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*TestBase");

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index db562da..d99ec20 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.extensions.sql;
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index ef75ee2..b27435c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -25,7 +25,6 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index 0876dd9..47109e0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -24,7 +24,6 @@ import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBo
 import java.sql.Types;
 import java.util.Arrays;
 import org.apache.beam.sdk.coders.BeamRecordCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.BeamRecord;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
index 46aea99..e36eb2b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.extensions.sql;
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 1541123..8db9d7a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -20,8 +20,6 @@ package org.apache.beam.sdk.extensions.sql;
 import java.sql.Types;
 import java.util.Arrays;
 import java.util.Iterator;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 373deb7..4a1f8a0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -21,7 +21,6 @@ package org.apache.beam.sdk.extensions.sql;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.BeamRecord;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index 97905c5..9d12126 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -19,12 +19,12 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.Lex;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
new file mode 100644
index 0000000..906ccfd
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Base class for rel test.
+ */
+public class BaseRelTest {
+  public PCollection<BeamRecord> compilePipeline (
+      String sql, Pipeline pipeline, BeamSqlEnv sqlEnv) throws Exception {
+    return sqlEnv.getPlanner().compileBeamPipeline(sql, pipeline, sqlEnv);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
index a51cc30..8e41d0a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
@@ -19,9 +19,8 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -34,7 +33,7 @@ import org.junit.Test;
 /**
  * Test for {@code BeamIntersectRel}.
  */
-public class BeamIntersectRelTest {
+public class BeamIntersectRelTest extends BaseRelTest {
   static BeamSqlEnv sqlEnv = new BeamSqlEnv();
 
   @Rule
@@ -77,7 +76,7 @@ public class BeamIntersectRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -100,7 +99,7 @@ public class BeamIntersectRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).satisfies(new CheckSize(3));
 
     PAssert.that(rows).containsInAnyOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
index dde1540..e0d691b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -19,9 +19,8 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -34,10 +33,10 @@ import org.junit.Test;
 /**
  * Bounded + Bounded Test for {@code BeamJoinRel}.
  */
-public class BeamJoinRelBoundedVsBoundedTest {
+public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+  private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
 
   public static final MockedBoundedTable ORDER_DETAILS1 =
       MockedBoundedTable.of(
@@ -63,8 +62,8 @@ public class BeamJoinRelBoundedVsBoundedTest {
 
   @BeforeClass
   public static void prepare() {
-    beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
-    beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
+    BEAM_SQL_ENV.registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
+    BEAM_SQL_ENV.registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
   }
 
   @Test
@@ -77,7 +76,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
         + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.INTEGER, "order_id",
@@ -102,7 +101,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     pipeline.enableAbandonedNodeEnforcement(false);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
@@ -130,7 +129,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.INTEGER, "order_id",
@@ -157,7 +156,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
           Types.INTEGER, "order_id",
@@ -187,7 +186,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
         ;
 
     pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     pipeline.run();
   }
 
@@ -198,7 +197,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
             + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
 
     pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     pipeline.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 28ad99c..c5145ec 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -20,9 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.sql.Types;
 import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
@@ -39,10 +38,10 @@ import org.junit.Test;
 /**
  * Unbounded + Unbounded Test for {@code BeamJoinRel}.
  */
-public class BeamJoinRelUnboundedVsBoundedTest {
+public class BeamJoinRelUnboundedVsBoundedTest extends BaseRelTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+  private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
   public static final Date FIRST_DATE = new Date(1);
   public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
   public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1);
@@ -50,7 +49,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
 
   @BeforeClass
   public static void prepare() {
-    beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+    BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable
         .of(
             Types.INTEGER, "order_id",
             Types.INTEGER, "site_id",
@@ -78,7 +77,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         )
     );
 
-    beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable
+    BEAM_SQL_ENV.registerTable("ORDER_DETAILS1", MockedBoundedTable
         .of(Types.INTEGER, "order_id",
             Types.VARCHAR, "buyer"
         ).addRows(
@@ -98,7 +97,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -124,7 +123,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -150,7 +149,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
@@ -178,7 +177,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         + " o1.order_id=o2.order_id"
         ;
     pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     pipeline.run();
   }
 
@@ -192,7 +191,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         + " on "
         + " o1.order_id=o2.order_id"
         ;
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -220,7 +219,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         ;
 
     pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     pipeline.run();
   }
 
@@ -235,7 +234,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         + " o1.order_id=o2.order_id"
         ;
     pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     pipeline.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
index a5a2e85..e5470ca 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -20,9 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.sql.Types;
 import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
 import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
 import org.apache.beam.sdk.testing.PAssert;
@@ -38,10 +37,10 @@ import org.junit.Test;
 /**
  * Unbounded + Unbounded Test for {@code BeamJoinRel}.
  */
-public class BeamJoinRelUnboundedVsUnboundedTest {
+public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+  private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
   public static final Date FIRST_DATE = new Date(1);
   public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
 
@@ -49,7 +48,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
 
   @BeforeClass
   public static void prepare() {
-    beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+    BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable
         .of(Types.INTEGER, "order_id",
             Types.INTEGER, "site_id",
             Types.INTEGER, "price",
@@ -88,7 +87,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -121,7 +120,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
     // 2, 2 | 2, 5
     // 3, 3 | NULL, NULL
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -151,7 +150,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -181,7 +180,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
         + " o1.order_id1=o2.order_id"
         ;
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
@@ -213,7 +212,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
         + " o1.order_id=o2.order_id"
         ;
     pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
     pipeline.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
index 425e554..5c4ae2c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
@@ -19,9 +19,8 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -34,7 +33,7 @@ import org.junit.Test;
 /**
  * Test for {@code BeamMinusRel}.
  */
-public class BeamMinusRelTest {
+public class BeamMinusRelTest extends BaseRelTest {
   static BeamSqlEnv sqlEnv = new BeamSqlEnv();
 
   @Rule
@@ -78,7 +77,7 @@ public class BeamMinusRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -100,7 +99,7 @@ public class BeamMinusRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).satisfies(new CheckSize(2));
 
     PAssert.that(rows).containsInAnyOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
index 4de493a..cd0297a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
@@ -21,9 +21,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 import java.sql.Types;
 import java.util.Date;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
@@ -38,7 +37,7 @@ import org.junit.Test;
 /**
  * Test for {@code BeamSetOperatorRelBase}.
  */
-public class BeamSetOperatorRelBaseTest {
+public class BeamSetOperatorRelBaseTest extends BaseRelTest {
   static BeamSqlEnv sqlEnv = new BeamSqlEnv();
 
   @Rule
@@ -71,7 +70,7 @@ public class BeamSetOperatorRelBaseTest {
         + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     // compare valueInString to ignore the windowStart & windowEnd
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
@@ -100,7 +99,7 @@ public class BeamSetOperatorRelBaseTest {
     // use a real pipeline rather than the TestPipeline because we are
     // testing exceptions, the pipeline will not actually run.
     Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
-    BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv);
+    compilePipeline(sql, pipeline1, sqlEnv);
     pipeline.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
index f033fa0..19ba0d0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
@@ -20,9 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.sql.Types;
 import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -35,7 +34,7 @@ import org.junit.Test;
 /**
  * Test for {@code BeamSortRel}.
  */
-public class BeamSortRelTest {
+public class BeamSortRelTest extends BaseRelTest {
   static BeamSqlEnv sqlEnv = new BeamSqlEnv();
 
   @Rule
@@ -78,7 +77,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 4";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
         Types.BIGINT, "order_id",
         Types.INTEGER, "site_id",
@@ -117,7 +116,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -155,7 +154,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -178,7 +177,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -201,7 +200,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 11";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -232,6 +231,6 @@ public class BeamSortRelTest {
         + "ORDER BY order_id asc limit 11";
 
     TestPipeline pipeline = TestPipeline.create();
-    BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    compilePipeline(sql, pipeline, sqlEnv);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
index 7cc52da..d79a54e 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
@@ -19,9 +19,8 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -34,7 +33,7 @@ import org.junit.Test;
 /**
  * Test for {@code BeamUnionRel}.
  */
-public class BeamUnionRelTest {
+public class BeamUnionRelTest extends BaseRelTest {
   static BeamSqlEnv sqlEnv = new BeamSqlEnv();
 
   @Rule
@@ -63,7 +62,7 @@ public class BeamUnionRelTest {
         + " order_id, site_id, price "
         + "FROM ORDER_DETAILS ";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",
@@ -86,7 +85,7 @@ public class BeamUnionRelTest {
         + " SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS";
 
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.BIGINT, "order_id",

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
index ff31e55..5604e32 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
@@ -19,9 +19,8 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -34,7 +33,7 @@ import org.junit.Test;
 /**
  * Test for {@code BeamValuesRel}.
  */
-public class BeamValuesRelTest {
+public class BeamValuesRelTest extends BaseRelTest {
   static BeamSqlEnv sqlEnv = new BeamSqlEnv();
 
   @Rule
@@ -60,7 +59,7 @@ public class BeamValuesRelTest {
   public void testValues() throws Exception {
     String sql = "insert into string_table(name, description) values "
         + "('hello', 'world'), ('james', 'bond')";
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.VARCHAR, "name",
@@ -76,7 +75,7 @@ public class BeamValuesRelTest {
   @Test
   public void testValues_castInt() throws Exception {
     String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.INTEGER, "c0",
@@ -91,7 +90,7 @@ public class BeamValuesRelTest {
   @Test
   public void testValues_onlySelect() throws Exception {
     String sql = "select 1, '1'";
-    PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             Types.INTEGER, "EXPR$0",

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
new file mode 100644
index 0000000..0a320db
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Tests for BeamSqlRowCoder.
+ */
+public class BeamSqlRowCoderTest {
+
+  @Test
+  public void encodeAndDecode() throws Exception {
+    final RelProtoDataType protoRowType = new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder()
+            .add("col_tinyint", SqlTypeName.TINYINT)
+            .add("col_smallint", SqlTypeName.SMALLINT)
+            .add("col_integer", SqlTypeName.INTEGER)
+            .add("col_bigint", SqlTypeName.BIGINT)
+            .add("col_float", SqlTypeName.FLOAT)
+            .add("col_double", SqlTypeName.DOUBLE)
+            .add("col_decimal", SqlTypeName.DECIMAL)
+            .add("col_string_varchar", SqlTypeName.VARCHAR)
+            .add("col_time", SqlTypeName.TIME)
+            .add("col_timestamp", SqlTypeName.TIMESTAMP)
+            .add("col_boolean", SqlTypeName.BOOLEAN)
+            .build();
+      }
+    };
+
+    BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType(
+        protoRowType.apply(new JavaTypeFactoryImpl(
+            RelDataTypeSystem.DEFAULT)));
+
+    GregorianCalendar calendar = new GregorianCalendar();
+    calendar.setTime(new Date());
+    BeamRecord row = new BeamRecord(beamSQLRowType
+        , Byte.valueOf("1"), Short.valueOf("1"), 1, 1L, 1.1F, 1.1
+        , BigDecimal.ZERO, "hello", calendar, new Date(), true);
+
+
+    BeamRecordCoder coder = beamSQLRowType.getRecordCoder();
+    CoderProperties.coderDecodeEncodeEqual(coder, row);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java
new file mode 100644
index 0000000..fd88448
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.kafka;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.csv.CSVFormat;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for BeamKafkaCSVTable.
+ */
+public class BeamKafkaCSVTableTest {
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+  public static BeamRecord row1;
+  public static BeamRecord row2;
+
+  @BeforeClass
+  public static void setUp() {
+    row1 = new BeamRecord(genRowType(), 1L, 1, 1.0);
+
+    row2 = new BeamRecord(genRowType(), 2L, 2, 2.0);
+  }
+
+  @Test public void testCsvRecorderDecoder() throws Exception {
+    PCollection<BeamRecord> result = pipeline
+        .apply(
+            Create.of("1,\"1\",1.0", "2,2,2.0")
+        )
+        .apply(ParDo.of(new String2KvBytes()))
+        .apply(
+            new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)
+        );
+
+    PAssert.that(result).containsInAnyOrder(row1, row2);
+
+    pipeline.run();
+  }
+
+  @Test public void testCsvRecorderEncoder() throws Exception {
+    PCollection<BeamRecord> result = pipeline
+        .apply(
+            Create.of(row1, row2)
+        )
+        .apply(
+            new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT)
+        ).apply(
+            new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)
+        );
+
+    PAssert.that(result).containsInAnyOrder(row1, row2);
+
+    pipeline.run();
+  }
+
+  private static BeamRecordSqlType genRowType() {
+    return CalciteUtils.toBeamRowType(new RelProtoDataType() {
+
+      @Override public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder().add("order_id", SqlTypeName.BIGINT)
+            .add("site_id", SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE).build();
+      }
+    }.apply(BeamQueryPlanner.TYPE_FACTORY));
+  }
+
+  private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>
+      implements Serializable {
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      ctx.output(KV.of(new byte[] {}, ctx.element().getBytes()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java
new file mode 100644
index 0000000..9a57a5f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Tests for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableTest {
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @Rule public TestPipeline pipeline2 = TestPipeline.create();
+
+  /**
+   * testData.
+   *
+   * <p>
+   * The types of the csv fields are:
+   *     integer,bigint,float,double,string
+   * </p>
+   */
+  private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" };
+  private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" };
+
+  private static List<Object[]> testData = Arrays.asList(data1, data2);
+  private static List<BeamRecord> testDataRows = new ArrayList<BeamRecord>() {{
+    for (Object[] data : testData) {
+      add(buildRow(data));
+    }
+  }};
+
+  private static Path tempFolder;
+  private static File readerSourceFile;
+  private static File writerTargetFile;
+
+  @Test public void testBuildIOReader() {
+    PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
+        readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
+    PAssert.that(rows).containsInAnyOrder(testDataRows);
+    pipeline.run();
+  }
+
+  @Test public void testBuildIOWriter() {
+    new BeamTextCSVTable(buildBeamSqlRowType(),
+        readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
+        .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath())
+            .buildIOWriter());
+    pipeline.run();
+
+    PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
+        writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
+
+    // confirm the two reads match
+    PAssert.that(rows).containsInAnyOrder(testDataRows);
+    pipeline2.run();
+  }
+
+  @BeforeClass public static void setUp() throws IOException {
+    tempFolder = Files.createTempDirectory("BeamTextTableTest");
+    readerSourceFile = writeToFile(testData, "readerSourceFile.txt");
+    writerTargetFile = writeToFile(testData, "writerTargetFile.txt");
+  }
+
+  @AfterClass public static void teardownClass() throws IOException {
+    Files.walkFileTree(tempFolder, new SimpleFileVisitor<Path>() {
+
+      @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
+          throws IOException {
+        Files.delete(file);
+        return FileVisitResult.CONTINUE;
+      }
+
+      @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc)
+          throws IOException {
+        Files.delete(dir);
+        return FileVisitResult.CONTINUE;
+      }
+    });
+  }
+
+  private static File writeToFile(List<Object[]> rows, String filename) throws IOException {
+    File file = tempFolder.resolve(filename).toFile();
+    OutputStream output = new FileOutputStream(file);
+    writeToStreamAndClose(rows, output);
+    return file;
+  }
+
+  /**
+   * Helper that writes the given lines (adding a newline in between) to a stream, then closes the
+   * stream.
+   */
+  private static void writeToStreamAndClose(List<Object[]> rows, OutputStream outputStream) {
+    try (PrintStream writer = new PrintStream(outputStream)) {
+      CSVPrinter printer = CSVFormat.DEFAULT.print(writer);
+      for (Object[] row : rows) {
+        for (Object field : row) {
+          printer.print(field);
+        }
+        printer.println();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private RelProtoDataType buildRowType() {
+    return new RelProtoDataType() {
+
+      @Override public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder().add("id", SqlTypeName.INTEGER).add("order_id", SqlTypeName.BIGINT)
+            .add("price", SqlTypeName.FLOAT).add("amount", SqlTypeName.DOUBLE)
+            .add("user_name", SqlTypeName.VARCHAR).build();
+      }
+    };
+  }
+
+  private static RelDataType buildRelDataType() {
+    return BeamQueryPlanner.TYPE_FACTORY.builder().add("id", SqlTypeName.INTEGER)
+        .add("order_id", SqlTypeName.BIGINT).add("price", SqlTypeName.FLOAT)
+        .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build();
+  }
+
+  private static BeamRecordSqlType buildBeamSqlRowType() {
+    return CalciteUtils.toBeamRowType(buildRelDataType());
+  }
+
+  private static BeamRecord buildRow(Object[] data) {
+    return new BeamRecord(buildBeamSqlRowType(), Arrays.asList(data));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java
new file mode 100644
index 0000000..948e86c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.transform;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlAvgAggFunction;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
+import org.apache.calcite.sql.fun.SqlSumAggFunction;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link BeamAggregationTransforms}.
+ *
+ */
+public class BeamAggregationTransformTest extends BeamTransformBaseTest{
+
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
+  private List<AggregateCall> aggCalls;
+
+  private BeamRecordSqlType keyType;
+  private BeamRecordSqlType aggPartType;
+  private BeamRecordSqlType outputType;
+
+  private BeamRecordCoder inRecordCoder;
+  private BeamRecordCoder keyCoder;
+  private BeamRecordCoder aggCoder;
+  private BeamRecordCoder outRecordCoder;
+
+  /**
+   * This step equals to below query.
+   * <pre>
+   * SELECT `f_int`
+   * , COUNT(*) AS `size`
+   * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1`
+   * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1`
+   * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2`
+   * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2`
+   * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3`
+   * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3`
+   * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4`
+   * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4`
+   * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5`
+   * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5`
+   * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7`
+   * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8`
+   * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8`
+   * FROM TABLE_NAME
+   * GROUP BY `f_int`
+   * </pre>
+   * @throws ParseException
+   */
+  @Test
+  public void testCountPerElementBasic() throws ParseException {
+    setupEnvironment();
+
+    PCollection<BeamRecord> input = p.apply(Create.of(inputRows));
+
+    //1. extract fields in group-by key part
+    PCollection<KV<BeamRecord, BeamRecord>> exGroupByStream = input.apply("exGroupBy",
+        WithKeys
+            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))))
+        .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, inRecordCoder));
+
+    //2. apply a GroupByKey.
+    PCollection<KV<BeamRecord, Iterable<BeamRecord>>> groupedStream = exGroupByStream
+        .apply("groupBy", GroupByKey.<BeamRecord, BeamRecord>create())
+        .setCoder(KvCoder.<BeamRecord, Iterable<BeamRecord>>of(keyCoder,
+            IterableCoder.<BeamRecord>of(inRecordCoder)));
+
+    //3. run aggregation functions
+    PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = groupedStream.apply("aggregation",
+        Combine.<BeamRecord, BeamRecord, BeamRecord>groupedValues(
+            new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType)))
+        .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, aggCoder));
+
+    //4. flat KV to a single record
+    PCollection<BeamRecord> mergedStream = aggregatedStream.apply("mergeRecord",
+        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1)));
+    mergedStream.setCoder(outRecordCoder);
+
+    //assert function BeamAggregationTransform.AggregationGroupByKeyFn
+    PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn());
+
+    //assert BeamAggregationTransform.AggregationCombineFn
+    PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn());
+
+  //assert BeamAggregationTransform.MergeAggregationRecord
+    PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord());
+
+    p.run();
+}
+
+  private void setupEnvironment() {
+    prepareAggregationCalls();
+    prepareTypeAndCoder();
+  }
+
+  /**
+   * create list of all {@link AggregateCall}.
+   */
+  @SuppressWarnings("deprecation")
+  private void prepareAggregationCalls() {
+    //aggregations for all data type
+    aggCalls = new ArrayList<>();
+    aggCalls.add(
+        new AggregateCall(new SqlCountAggFunction(), false,
+            Arrays.<Integer>asList(),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
+            "count")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlSumAggFunction(
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false,
+            Arrays.<Integer>asList(1),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
+            "sum1")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+            Arrays.<Integer>asList(1),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
+            "avg1")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+            Arrays.<Integer>asList(1),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
+            "max1")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+            Arrays.<Integer>asList(1),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
+            "min1")
+        );
+
+    aggCalls.add(
+        new AggregateCall(new SqlSumAggFunction(
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false,
+            Arrays.<Integer>asList(2),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
+            "sum2")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+            Arrays.<Integer>asList(2),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
+            "avg2")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+            Arrays.<Integer>asList(2),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
+            "max2")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+            Arrays.<Integer>asList(2),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
+            "min2")
+        );
+
+    aggCalls.add(
+        new AggregateCall(
+            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)),
+            false,
+            Arrays.<Integer>asList(3),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
+            "sum3")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+            Arrays.<Integer>asList(3),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
+            "avg3")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+            Arrays.<Integer>asList(3),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
+            "max3")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+            Arrays.<Integer>asList(3),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
+            "min3")
+        );
+
+    aggCalls.add(
+        new AggregateCall(
+            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)),
+            false,
+            Arrays.<Integer>asList(4),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
+            "sum4")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+            Arrays.<Integer>asList(4),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
+            "avg4")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+            Arrays.<Integer>asList(4),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
+            "max4")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+            Arrays.<Integer>asList(4),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
+            "min4")
+        );
+
+    aggCalls.add(
+        new AggregateCall(
+            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)),
+            false,
+            Arrays.<Integer>asList(5),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
+            "sum5")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+            Arrays.<Integer>asList(5),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
+            "avg5")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+            Arrays.<Integer>asList(5),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
+            "max5")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+            Arrays.<Integer>asList(5),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
+            "min5")
+        );
+
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+            Arrays.<Integer>asList(7),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
+            "max7")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+            Arrays.<Integer>asList(7),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
+            "min7")
+        );
+
+    aggCalls.add(
+        new AggregateCall(
+            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)),
+            false,
+            Arrays.<Integer>asList(8),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
+            "sum8")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+            Arrays.<Integer>asList(8),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
+            "avg8")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+            Arrays.<Integer>asList(8),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
+            "max8")
+        );
+    aggCalls.add(
+        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+            Arrays.<Integer>asList(8),
+            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
+            "min8")
+        );
+  }
+
+  /**
+   * Coders used in aggregation steps.
+   */
+  private void prepareTypeAndCoder() {
+    inRecordCoder = inputRowType.getRecordCoder();
+
+    keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
+    keyCoder = keyType.getRecordCoder();
+
+    aggPartType = initTypeOfSqlRow(
+        Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
+
+            KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
+            KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
+
+            KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
+            KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
+
+            KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
+            KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
+
+            KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
+            KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
+
+            KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
+            KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
+
+            KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
+
+            KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
+            KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
+            ));
+    aggCoder = aggPartType.getRecordCoder();
+
+    outputType = prepareFinalRowType();
+    outRecordCoder = outputType.getRecordCoder();
+  }
+
+  /**
+   * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
+   */
+  private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationGroupByKeyFn() {
+    return Arrays.asList(
+        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+            inputRows.get(0)),
+        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
+            inputRows.get(1)),
+        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
+            inputRows.get(2)),
+        KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
+            inputRows.get(3)));
+  }
+
+  /**
+   * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
+   */
+  private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationCombineFn()
+      throws ParseException {
+    return Arrays.asList(
+            KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+                new BeamRecord(aggPartType, Arrays.<Object>asList(
+                    4L,
+                    10000L, 2500L, 4000L, 1000L,
+                    (short) 10, (short) 2, (short) 4, (short) 1,
+                    (byte) 10, (byte) 2, (byte) 4, (byte) 1,
+                    10.0F, 2.5F, 4.0F, 1.0F,
+                    10.0, 2.5, 4.0, 1.0,
+                    format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
+                    10, 2, 4, 1
+                    )))
+            );
+  }
+
+  /**
+   * Row type of final output row.
+   */
+  private BeamRecordSqlType prepareFinalRowType() {
+    FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
+    List<KV<String, SqlTypeName>> columnMetadata =
+        Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
+
+        KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
+        KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
+
+        KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
+        KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
+
+        KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
+        KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
+
+        KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
+        KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
+
+        KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
+        KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
+
+        KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
+
+        KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
+        KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
+        );
+    for (KV<String, SqlTypeName> cm : columnMetadata) {
+      builder.add(cm.getKey(), cm.getValue());
+    }
+    return CalciteUtils.toBeamRowType(builder.build());
+  }
+
+  /**
+   * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}.
+   */
+  private BeamRecord prepareResultOfMergeAggregationRecord() throws ParseException {
+    return new BeamRecord(outputType, Arrays.<Object>asList(
+        1, 4L,
+        10000L, 2500L, 4000L, 1000L,
+        (short) 10, (short) 2, (short) 4, (short) 1,
+        (byte) 10, (byte) 2, (byte) 4, (byte) 1,
+        10.0F, 2.5F, 4.0F, 1.0F,
+        10.0, 2.5, 4.0, 1.0,
+        format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
+        10, 2, 4, 1
+        ));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java
new file mode 100644
index 0000000..3c8f040
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.transform;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.KV;
+import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+
+/**
+ * shared methods to test PTransforms which execute Beam SQL steps.
+ *
+ */
+public class BeamTransformBaseTest {
+  public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  public static BeamRecordSqlType inputRowType;
+  public static List<BeamRecord> inputRows;
+
+  @BeforeClass
+  public static void prepareInput() throws NumberFormatException, ParseException{
+    List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList(
+        KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT),
+        KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT),
+        KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE),
+        KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP),
+        KV.of("f_int2", SqlTypeName.INTEGER)
+        );
+    inputRowType = initTypeOfSqlRow(columnMetadata);
+    inputRows = Arrays.asList(
+        initBeamSqlRow(columnMetadata,
+            Arrays.<Object>asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0F, 1.0,
+                "string_row1", format.parse("2017-01-01 01:01:03"), 1)),
+        initBeamSqlRow(columnMetadata,
+            Arrays.<Object>asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0F, 2.0,
+                "string_row2", format.parse("2017-01-01 01:02:03"), 2)),
+        initBeamSqlRow(columnMetadata,
+            Arrays.<Object>asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0F, 3.0,
+                "string_row3", format.parse("2017-01-01 01:03:03"), 3)),
+        initBeamSqlRow(columnMetadata, Arrays.<Object>asList(1, 4000L, Short.valueOf("4"),
+            Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"), 4)));
+  }
+
+  /**
+   * create a {@code BeamSqlRowType} for given column metadata.
+   */
+  public static BeamRecordSqlType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
+    FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
+    for (KV<String, SqlTypeName> cm : columnMetadata) {
+      builder.add(cm.getKey(), cm.getValue());
+    }
+    return CalciteUtils.toBeamRowType(builder.build());
+  }
+
+  /**
+   * Create an empty row with given column metadata.
+   */
+  public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) {
+    return initBeamSqlRow(columnMetadata, Arrays.asList());
+  }
+
+  /**
+   * Create a row with given column metadata, and values for each column.
+   *
+   */
+  public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
+      List<Object> rowValues){
+    BeamRecordSqlType rowType = initTypeOfSqlRow(columnMetadata);
+
+    return new BeamRecord(rowType, rowValues);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 5898e2e..a64afa6 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -29,10 +29,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.BeamSql;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.BeamRecord;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
index 4ce2f45..a836f79 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -21,8 +21,8 @@ package org.apache.beam.sdk.extensions.sql.integrationtest;
 import java.math.BigDecimal;
 import java.sql.Types;
 import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
index 60e8211..cf66268 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
@@ -25,8 +25,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
index 426789c..d661866 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
@@ -19,8 +19,8 @@
 package org.apache.beam.sdk.extensions.sql.mock;
 
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
index 465705d..31234e1 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
@@ -22,9 +22,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;