You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:09 UTC

[46/59] beam git commit: move all implementation classes/packages into impl package

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
new file mode 100644
index 0000000..1dbd8b4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unbounded + Unbounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelUnboundedVsBoundedTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+  public static final Date FIRST_DATE = new Date(1);
+  public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
+  public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1);
+  private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
+  @BeforeClass
+  public static void prepare() {
+    beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+        .of(
+            Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.TIMESTAMP, "order_time"
+        )
+        .timestampColumnIndex(3)
+        .addRows(
+            Duration.ZERO,
+            1, 1, 1, FIRST_DATE,
+            1, 2, 2, FIRST_DATE
+        )
+        .addRows(
+            WINDOW_SIZE.plus(Duration.standardSeconds(1)),
+            2, 2, 3, SECOND_DATE,
+            2, 3, 3, SECOND_DATE,
+            // this late data is omitted
+            1, 2, 3, FIRST_DATE
+        )
+        .addRows(
+            WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)),
+            3, 3, 3, THIRD_DATE,
+            // this late data is omitted
+            2, 2, 3, SECOND_DATE
+        )
+    );
+
+    beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable
+        .of(Types.INTEGER, "order_id",
+            Types.VARCHAR, "buyer"
+        ).addRows(
+            1, "james",
+            2, "bond"
+        ));
+  }
+
+  @Test
+  public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " JOIN "
+        + " ORDER_DETAILS1 o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).addRows(
+                1, 3, "james",
+                2, 5, "bond"
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).addRows(
+                1, 3, "james",
+                2, 5, "bond"
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " LEFT OUTER JOIN "
+        + " ORDER_DETAILS1 o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).addRows(
+                1, 3, "james",
+                2, 5, "bond",
+                3, 3, null
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testLeftOuterJoinError() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " LEFT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+
+  @Test
+  public void testRightOuterJoin() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " RIGHT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).addRows(
+                1, 3, "james",
+                2, 5, "bond",
+                3, 3, null
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testRightOuterJoinError() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " RIGHT OUTER JOIN "
+        + " ORDER_DETAILS1 o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testFullOuterJoinError() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " FULL OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
new file mode 100644
index 0000000..5e5e416
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
+import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unbounded + Unbounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelUnboundedVsUnboundedTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+  public static final Date FIRST_DATE = new Date(1);
+  public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
+
+  private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
+  @BeforeClass
+  public static void prepare() {
+    beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+        .of(Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.TIMESTAMP, "order_time"
+        )
+        .timestampColumnIndex(3)
+        .addRows(
+            Duration.ZERO,
+            1, 1, 1, FIRST_DATE,
+            1, 2, 6, FIRST_DATE
+        )
+        .addRows(
+            WINDOW_SIZE.plus(Duration.standardMinutes(1)),
+            2, 2, 7, SECOND_DATE,
+            2, 3, 8, SECOND_DATE,
+            // this late record is omitted(First window)
+            1, 3, 3, FIRST_DATE
+        )
+        .addRows(
+            // this late record is omitted(Second window)
+            WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)),
+            2, 3, 3, SECOND_DATE
+        )
+    );
+  }
+
+  @Test
+  public void testInnerJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id0",
+                Types.INTEGER, "sum_site_id0").addRows(
+                1, 3, 1, 3,
+                2, 5, 2, 5
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " LEFT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    // 1, 1 | 1, 3
+    // 2, 2 | NULL, NULL
+    // ---- | -----
+    // 2, 2 | 2, 5
+    // 3, 3 | NULL, NULL
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id0",
+                Types.INTEGER, "sum_site_id0"
+            ).addRows(
+                1, 1, 1, 3,
+                2, 2, null, null,
+                2, 2, 2, 5,
+                3, 3, null, null
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testRightOuterJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " RIGHT OUTER JOIN "
+        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id0",
+                Types.INTEGER, "sum_site_id0"
+            ).addRows(
+                1, 3, 1, 1,
+                null, null, 2, 2,
+                2, 5, 2, 2,
+                null, null, 3, 3
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testFullOuterJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " FULL OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id1=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id1",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id0"
+            ).addRows(
+                1, 1, 1, 3,
+                6, 2, null, null,
+                7, 2, null, null,
+                8, 3, null, null,
+                null, null, 2, 5
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWindowsMismatch() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 "
+        + " LEFT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
new file mode 100644
index 0000000..9149dd4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.sql.Types;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamMinusRel}.
+ */
+public class BeamMinusRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS1",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            4L, 4, 4.0,
+            4L, 4, 4.0
+        )
+    );
+
+    sqlEnv.registerTable("ORDER_DETAILS2",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            3L, 3, 3.0
+        )
+    );
+  }
+
+  @Test
+  public void testExcept() throws Exception {
+    String sql = "";
+    sql += "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS1 "
+        + " EXCEPT "
+        + "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS2 ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            4L, 4, 4.0
+        ).getRows());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testExceptAll() throws Exception {
+    String sql = "";
+    sql += "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS1 "
+        + " EXCEPT ALL "
+        + "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS2 ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).satisfies(new CheckSize(2));
+
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            4L, 4, 4.0,
+            4L, 4, 4.0
+        ).getRows());
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
new file mode 100644
index 0000000..36538c0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSetOperatorRelBase}.
+ */
+public class BeamSetOperatorRelBaseTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  public static final Date THE_DATE = new Date(100000);
+
+  @BeforeClass
+  public static void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price",
+            Types.TIMESTAMP, "order_time"
+        ).addRows(
+            1L, 1, 1.0, THE_DATE,
+            2L, 2, 2.0, THE_DATE
+        )
+    );
+  }
+
+  @Test
+  public void testSameWindow() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
+        + " UNION SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    // compare valueInString to ignore the windowStart & windowEnd
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.BIGINT, "order_id",
+                Types.INTEGER, "site_id",
+                Types.BIGINT, "cnt"
+            ).addRows(
+                1L, 1, 1L,
+                2L, 2, 1L
+            ).getStringRows());
+    pipeline.run();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDifferentWindows() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
+        + " UNION SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '2' HOUR) ";
+
+    // use a real pipeline rather than the TestPipeline because we are
+    // testing exceptions, the pipeline will not actually run.
+    Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
+    BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv);
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
new file mode 100644
index 0000000..15e3b89
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSortRel}.
+ */
+public class BeamSortRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  @Before
+  public void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price",
+            Types.TIMESTAMP, "order_time"
+        ).addRows(
+            1L, 2, 1.0, new Date(),
+            1L, 1, 2.0, new Date(),
+            2L, 4, 3.0, new Date(),
+            2L, 1, 4.0, new Date(),
+            5L, 5, 5.0, new Date(),
+            6L, 6, 6.0, new Date(),
+            7L, 7, 7.0, new Date(),
+            8L, 8888, 8.0, new Date(),
+            8L, 999, 9.0, new Date(),
+            10L, 100, 10.0, new Date()
+        )
+    );
+    sqlEnv.registerTable("SUB_ORDER_RAM",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        )
+    );
+  }
+
+  @Test
+  public void testOrderBy_basic() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc limit 4";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
+        Types.BIGINT, "order_id",
+        Types.INTEGER, "site_id",
+        Types.DOUBLE, "price"
+    ).addRows(
+        1L, 2, 1.0,
+        1L, 1, 2.0,
+        2L, 4, 3.0,
+        2L, 1, 4.0
+    ).getRows());
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testOrderBy_nullsFirst() throws Exception {
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 2, 1.0,
+            1L, null, 2.0,
+            2L, 1, 3.0,
+            2L, null, 4.0,
+            5L, 5, 5.0
+        )
+    );
+    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
+        .of(Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"));
+
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, null, 2.0,
+            1L, 2, 1.0,
+            2L, null, 4.0,
+            2L, 1, 3.0
+        ).getRows()
+    );
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testOrderBy_nullsLast() throws Exception {
+    sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable
+        .of(Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 2, 1.0,
+            1L, null, 2.0,
+            2L, 1, 3.0,
+            2L, null, 4.0,
+            5L, 5, 5.0));
+    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
+        .of(Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"));
+
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 2, 1.0,
+            1L, null, 2.0,
+            2L, 1, 3.0,
+            2L, null, 4.0
+        ).getRows()
+    );
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testOrderBy_with_offset() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            5L, 5, 5.0,
+            6L, 6, 6.0,
+            7L, 7, 7.0,
+            8L, 8888, 8.0
+        ).getRows()
+    );
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testOrderBy_bigFetch() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc limit 11";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 2, 1.0,
+            1L, 1, 2.0,
+            2L, 4, 3.0,
+            2L, 1, 4.0,
+            5L, 5, 5.0,
+            6L, 6, 6.0,
+            7L, 7, 7.0,
+            8L, 8888, 8.0,
+            8L, 999, 9.0,
+            10L, 100, 10.0
+        ).getRows()
+    );
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testOrderBy_exception() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id)  SELECT "
+        + " order_id, COUNT(*) "
+        + "FROM ORDER_DETAILS "
+        + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
+        + "ORDER BY order_id asc limit 11";
+
+    TestPipeline pipeline = TestPipeline.create();
+    BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
new file mode 100644
index 0000000..c232b30
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.sql.Types;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamUnionRel}.
+ */
+public class BeamUnionRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0
+        )
+    );
+  }
+
+  @Test
+  public void testUnion() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + " UNION SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0
+        ).getRows()
+    );
+    pipeline.run();
+  }
+
+  @Test
+  public void testUnionAll() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS"
+        + " UNION ALL "
+        + " SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            2L, 2, 2.0
+        ).getRows()
+    );
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
new file mode 100644
index 0000000..e5fa864
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.sql.Types;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamValuesRel}.
+ */
+public class BeamValuesRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void prepare() {
+    sqlEnv.registerTable("string_table",
+        MockedBoundedTable.of(
+            Types.VARCHAR, "name",
+            Types.VARCHAR, "description"
+        )
+    );
+    sqlEnv.registerTable("int_table",
+        MockedBoundedTable.of(
+            Types.INTEGER, "c0",
+            Types.INTEGER, "c1"
+        )
+    );
+  }
+
+  @Test
+  public void testValues() throws Exception {
+    String sql = "insert into string_table(name, description) values "
+        + "('hello', 'world'), ('james', 'bond')";
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.VARCHAR, "name",
+            Types.VARCHAR, "description"
+        ).addRows(
+            "hello", "world",
+            "james", "bond"
+        ).getRows()
+    );
+    pipeline.run();
+  }
+
+  @Test
+  public void testValues_castInt() throws Exception {
+    String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.INTEGER, "c0",
+            Types.INTEGER, "c1"
+        ).addRows(
+            1, 2
+        ).getRows()
+    );
+    pipeline.run();
+  }
+
+  @Test
+  public void testValues_onlySelect() throws Exception {
+    String sql = "select 1, '1'";
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.INTEGER, "EXPR$0",
+            Types.CHAR, "EXPR$1"
+        ).addRows(
+            1, "1"
+        ).getRows()
+    );
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
new file mode 100644
index 0000000..8cdf2cd
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Assert;
+
+/**
+ * Utility class to check size of BeamSQLRow iterable.
+ */
+public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
+  private int size;
+  public CheckSize(int size) {
+    this.size = size;
+  }
+  @Override public Void apply(Iterable<BeamSqlRow> input) {
+    int count = 0;
+    for (BeamSqlRow row : input) {
+      count++;
+    }
+    Assert.assertEquals(size, count);
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java
deleted file mode 100644
index 2843e41..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter;
-
-import static org.junit.Assert.assertTrue;
-
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.TimeZone;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCaseExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlExtractExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlNotExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlConcatExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlInitCapExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlLowerExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlOverlayExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlPositionExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlSubstringExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlTrimExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlUpperExpression;
-import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
-import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Unit test cases for {@link BeamSqlFnExecutor}.
- */
-public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
-
-  @Test
-  public void testBeamFilterRel() {
-    RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND,
-        Arrays.asList(
-            rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
-                Arrays.asList(rexBuilder.makeInputRef(relDataType, 0),
-                    rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))),
-            rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
-                Arrays.asList(rexBuilder.makeInputRef(relDataType, 1),
-                    rexBuilder.makeExactLiteral(new BigDecimal(0))))));
-
-    BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, RelTraitSet.createEmpty(), null,
-        condition);
-
-    BeamSqlFnExecutor executor = new BeamSqlFnExecutor(beamFilterRel);
-    executor.prepare();
-
-    Assert.assertEquals(1, executor.exps.size());
-
-    BeamSqlExpression l1Exp = executor.exps.get(0);
-    assertTrue(l1Exp instanceof BeamSqlAndExpression);
-    Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType());
-
-    Assert.assertEquals(2, l1Exp.getOperands().size());
-    BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0);
-    BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1);
-
-    assertTrue(l1Left instanceof BeamSqlLessThanOrEqualsExpression);
-    assertTrue(l1Right instanceof BeamSqlEqualsExpression);
-
-    Assert.assertEquals(2, l1Left.getOperands().size());
-    BeamSqlExpression l1LeftLeft = (BeamSqlExpression) l1Left.getOperands().get(0);
-    BeamSqlExpression l1LeftRight = (BeamSqlExpression) l1Left.getOperands().get(1);
-    assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression);
-    assertTrue(l1LeftRight instanceof BeamSqlPrimitive);
-
-    Assert.assertEquals(2, l1Right.getOperands().size());
-    BeamSqlExpression l1RightLeft = (BeamSqlExpression) l1Right.getOperands().get(0);
-    BeamSqlExpression l1RightRight = (BeamSqlExpression) l1Right.getOperands().get(1);
-    assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression);
-    assertTrue(l1RightRight instanceof BeamSqlPrimitive);
-  }
-
-  @Test
-  public void testBeamProjectRel() {
-    BeamRelNode relNode = new BeamProjectRel(cluster, RelTraitSet.createEmpty(),
-        relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(),
-        rexBuilder.identityProjects(relDataType), relDataType);
-    BeamSqlFnExecutor executor = new BeamSqlFnExecutor(relNode);
-
-    executor.prepare();
-    Assert.assertEquals(4, executor.exps.size());
-    assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression);
-    assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression);
-    assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression);
-    assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression);
-  }
-
-
-  @Test
-  public void testBuildExpression_logical() {
-    RexNode rexNode;
-    BeamSqlExpression exp;
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
-        Arrays.asList(
-            rexBuilder.makeLiteral(true),
-            rexBuilder.makeLiteral(false)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlAndExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OR,
-        Arrays.asList(
-            rexBuilder.makeLiteral(true),
-            rexBuilder.makeLiteral(false)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlOrExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
-        Arrays.asList(
-            rexBuilder.makeLiteral(true)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlNotExpression);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testBuildExpression_logical_andOr_invalidOperand() {
-    RexNode rexNode;
-    BeamSqlExpression exp;
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
-        Arrays.asList(
-            rexBuilder.makeLiteral(true),
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    BeamSqlFnExecutor.buildExpression(rexNode);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testBuildExpression_logical_not_invalidOperand() {
-    RexNode rexNode;
-    BeamSqlExpression exp;
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    BeamSqlFnExecutor.buildExpression(rexNode);
-  }
-
-
-  @Test(expected = IllegalStateException.class)
-  public void testBuildExpression_logical_not_invalidOperandCount() {
-    RexNode rexNode;
-    BeamSqlExpression exp;
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
-        Arrays.asList(
-            rexBuilder.makeLiteral(true),
-            rexBuilder.makeLiteral(true)
-        )
-    );
-    BeamSqlFnExecutor.buildExpression(rexNode);
-  }
-
-  @Test
-  public void testBuildExpression_arithmetic() {
-    testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, BeamSqlPlusExpression.class);
-    testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, BeamSqlMinusExpression.class);
-    testBuildArithmeticExpression(SqlStdOperatorTable.MULTIPLY, BeamSqlMultiplyExpression.class);
-    testBuildArithmeticExpression(SqlStdOperatorTable.DIVIDE, BeamSqlDivideExpression.class);
-    testBuildArithmeticExpression(SqlStdOperatorTable.MOD, BeamSqlModExpression.class);
-  }
-
-  private void testBuildArithmeticExpression(SqlOperator fn,
-      Class<? extends BeamSqlExpression> clazz) {
-    RexNode rexNode;
-    BeamSqlExpression exp;
-    rexNode = rexBuilder.makeCall(fn, Arrays.asList(
-        rexBuilder.makeBigintLiteral(new BigDecimal(1L)),
-        rexBuilder.makeBigintLiteral(new BigDecimal(1L))
-    ));
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-
-    assertTrue(exp.getClass().equals(clazz));
-  }
-
-  @Test
-  public void testBuildExpression_string()  {
-    RexNode rexNode;
-    BeamSqlExpression exp;
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello "),
-            rexBuilder.makeLiteral("world")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlConcatExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello"),
-            rexBuilder.makeLiteral("worldhello")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlPositionExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello"),
-            rexBuilder.makeLiteral("worldhello"),
-            rexBuilder.makeCast(BeamQueryPlanner.TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER),
-                rexBuilder.makeBigintLiteral(BigDecimal.ONE))
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlPositionExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlCharLengthExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlUpperExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlLowerExpression);
-
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlInitCapExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM,
-        Arrays.asList(
-            rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlTrimExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlSubstringExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlSubstringExpression);
-
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlOverlayExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlOverlayExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE,
-        Arrays.asList(
-            rexBuilder.makeLiteral(true),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlCaseExpression);
-  }
-
-  @Test
-  public void testBuildExpression_date() {
-    RexNode rexNode;
-    BeamSqlExpression exp;
-    Calendar calendar = Calendar.getInstance();
-    calendar.setTimeZone(TimeZone.getTimeZone("GMT"));
-    calendar.setTime(new Date());
-
-    // CEIL
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CEIL,
-        Arrays.asList(
-            rexBuilder.makeDateLiteral(calendar),
-            rexBuilder.makeFlag(TimeUnitRange.MONTH)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlDateCeilExpression);
-
-    // FLOOR
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.FLOOR,
-        Arrays.asList(
-            rexBuilder.makeDateLiteral(calendar),
-            rexBuilder.makeFlag(TimeUnitRange.MONTH)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlDateFloorExpression);
-
-    // EXTRACT == EXTRACT_DATE?
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.EXTRACT,
-        Arrays.asList(
-            rexBuilder.makeFlag(TimeUnitRange.MONTH),
-            rexBuilder.makeDateLiteral(calendar)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlExtractExpression);
-
-    // CURRENT_DATE
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CURRENT_DATE,
-        Arrays.<RexNode>asList(
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlCurrentDateExpression);
-
-    // LOCALTIME
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIME,
-        Arrays.<RexNode>asList(
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlCurrentTimeExpression);
-
-    // LOCALTIMESTAMP
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIMESTAMP,
-        Arrays.<RexNode>asList(
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlCurrentTimestampExpression);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java
deleted file mode 100644
index c6478a6..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.planner.BeamRelDataTypeSystem;
-import org.apache.beam.sdk.extensions.sql.planner.BeamRuleSets;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.volcano.VolcanoPlanner;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.RelBuilder;
-import org.junit.BeforeClass;
-
-/**
- * base class to test {@link BeamSqlFnExecutor} and subclasses of {@link BeamSqlExpression}.
- */
-public class BeamSqlFnExecutorTestBase {
-  public static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
-  public static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(), rexBuilder);
-
-  public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
-      RelDataTypeSystem.DEFAULT);
-  public static RelDataType relDataType;
-
-  public static BeamSqlRowType beamRowType;
-  public static BeamSqlRow record;
-
-  public static RelBuilder relBuilder;
-
-  @BeforeClass
-  public static void prepare() {
-    relDataType = TYPE_FACTORY.builder()
-        .add("order_id", SqlTypeName.BIGINT)
-        .add("site_id", SqlTypeName.INTEGER)
-        .add("price", SqlTypeName.DOUBLE)
-        .add("order_time", SqlTypeName.BIGINT).build();
-
-    beamRowType = CalciteUtils.toBeamRowType(relDataType);
-    record = new BeamSqlRow(beamRowType);
-
-    record.addField(0, 1234567L);
-    record.addField(1, 0);
-    record.addField(2, 8.9);
-    record.addField(3, 1234567L);
-
-    SchemaPlus schema = Frameworks.createRootSchema(true);
-    final List<RelTraitDef> traitDefs = new ArrayList<>();
-    traitDefs.add(ConventionTraitDef.INSTANCE);
-    traitDefs.add(RelCollationTraitDef.INSTANCE);
-    FrameworkConfig config = Frameworks.newConfigBuilder()
-        .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
-        .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
-        .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
-
-    relBuilder = RelBuilder.create(config);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java
deleted file mode 100644
index 7bfbe20..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test cases for {@link BeamSqlIsNullExpression} and
- * {@link BeamSqlIsNotNullExpression}.
- */
-public class BeamNullExperssionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test
-  public void testIsNull() {
-    BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression(
-        new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
-    Assert.assertEquals(false, exp1.evaluate(record).getValue());
-
-    BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression(
-        BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
-    Assert.assertEquals(true, exp2.evaluate(record).getValue());
-  }
-
-  @Test
-  public void testIsNotNull() {
-    BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression(
-        new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
-    Assert.assertEquals(true, exp1.evaluate(record).getValue());
-
-    BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression(
-        BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
-    Assert.assertEquals(false, exp2.evaluate(record).getValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
deleted file mode 100644
index b6f65a1..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test cases for {@link BeamSqlAndExpression}, {@link BeamSqlOrExpression}.
- */
-public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test
-  public void testAnd() {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-
-    Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record).getValue());
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
-
-    Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record).getValue());
-  }
-
-  @Test
-  public void testOr() {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
-
-    Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record).getValue());
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-
-    Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record).getValue());
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
deleted file mode 100644
index 28ed920..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlCaseExpression.
- */
-public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    assertTrue(new BeamSqlCaseExpression(operands).accept());
-
-    // even param count
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    assertFalse(new BeamSqlCaseExpression(operands).accept());
-
-    // `when` type error
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "error"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    assertFalse(new BeamSqlCaseExpression(operands).accept());
-
-    // `then` type mixing
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    assertFalse(new BeamSqlCaseExpression(operands).accept());
-
-  }
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    assertEquals("hello", new BeamSqlCaseExpression(operands)
-        .evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    assertEquals("world", new BeamSqlCaseExpression(operands)
-        .evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello1"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    assertEquals("hello1", new BeamSqlCaseExpression(operands)
-        .evaluate(record).getValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java
deleted file mode 100644
index feefc45..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test for {@link BeamSqlCastExpression}.
- */
-public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  private List<BeamSqlExpression> operands;
-
-  @Before
-  public void setup() {
-    operands = new ArrayList<>();
-  }
-
-  @Test
-  public void testForOperands() {
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "aaa"));
-    Assert.assertFalse(new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).accept());
-  }
-
-  @Test
-  public void testForIntegerToBigintTypeCasting() {
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
-    Assert.assertEquals(5L,
-        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
-  }
-
-  @Test
-  public void testForDoubleToBigIntCasting() {
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45));
-    Assert.assertEquals(5L,
-        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
-  }
-
-  @Test
-  public void testForIntegerToDateCast() {
-    // test for yyyyMMdd format
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521));
-    Assert.assertEquals(Date.valueOf("2017-05-21"),
-        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
-  }
-
-  @Test
-  public void testyyyyMMddDateFormat() {
-    //test for yyyy-MM-dd format
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21"));
-    Assert.assertEquals(Date.valueOf("2017-05-21"),
-        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
-  }
-
-  @Test
-  public void testyyMMddDateFormat() {
-    // test for yy.MM.dd format
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21"));
-    Assert.assertEquals(Date.valueOf("2017-05-21"),
-        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
-  }
-
-  @Test
-  public void testForTimestampCastExpression() {
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989"));
-    Assert.assertEquals(SqlTypeName.TIMESTAMP,
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record)
-            .getOutputType());
-  }
-
-  @Test
-  public void testDateTimeFormatWithMillis() {
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989"));
-    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
-  }
-
-  @Test
-  public void testDateTimeFormatWithTimezone() {
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST"));
-    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
-  }
-
-  @Test
-  public void testDateTimeFormat() {
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59"));
-    Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
-  }
-
-  @Test(expected = RuntimeException.class)
-  public void testForCastTypeNotSupported() {
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime()));
-    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
-  }
-
-}