You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:07 UTC

[44/59] beam git commit: move all implementation classes/packages into impl package

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
deleted file mode 100644
index 2ca0a98..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlOverlayExpression.
- */
-public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertTrue(new BeamSqlOverlayExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    assertTrue(new BeamSqlOverlayExpression(operands).accept());
-  }
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
-    Assert.assertEquals("w3resou3rce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
-    Assert.assertEquals("w3resou33rce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
-    Assert.assertEquals("w3resou3rce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7));
-    Assert.assertEquals("w3resouce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
deleted file mode 100644
index a8e3dd2..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlPositionExpression.
- */
-public class BeamSqlPositionExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    assertTrue(new BeamSqlPositionExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertTrue(new BeamSqlPositionExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    assertFalse(new BeamSqlPositionExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertFalse(new BeamSqlPositionExpression(operands).accept());
-  }
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(-1, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
deleted file mode 100644
index f23a18d..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlStringUnaryExpression.
- */
-public class BeamSqlStringUnaryExpressionTest {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    assertTrue(new BeamSqlCharLengthExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertFalse(new BeamSqlCharLengthExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    assertFalse(new BeamSqlCharLengthExpression(operands).accept());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
deleted file mode 100644
index ea929a4..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlSubstringExpression.
- */
-public class BeamSqlSubstringExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertTrue(new BeamSqlSubstringExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    assertTrue(new BeamSqlSubstringExpression(operands).accept());
-  }
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals("hello",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    assertEquals("he",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
-    assertEquals("hello",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100));
-    assertEquals("hello",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0));
-    assertEquals("",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
-    assertEquals("",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
-    assertEquals("o",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
deleted file mode 100644
index 8b2570e..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlTrimExpression.
- */
-public class BeamSqlTrimExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
-    assertTrue(new BeamSqlTrimExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.BOTH));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
-    assertTrue(new BeamSqlTrimExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
-    assertFalse(new BeamSqlTrimExpression(operands).accept());
-  }
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.LEADING));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
-    Assert.assertEquals("__hehe",
-        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.TRAILING));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
-    Assert.assertEquals("hehe__",
-        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.BOTH));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "__"));
-    Assert.assertEquals("__",
-        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
-    Assert.assertEquals("hello",
-        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
-  }
-
-  @Test public void leadingTrim() throws Exception {
-    assertEquals("__hehe",
-        BeamSqlTrimExpression.leadingTrim("hehe__hehe", "he"));
-  }
-
-  @Test public void trailingTrim() throws Exception {
-    assertEquals("hehe__",
-        BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"));
-  }
-
-  @Test public void trim() throws Exception {
-    assertEquals("__",
-        BeamSqlTrimExpression.leadingTrim(
-        BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"), "he"
-        ));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
deleted file mode 100644
index a225cd6..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test of BeamSqlUpperExpression.
- */
-public class BeamSqlUpperExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    assertEquals("HELLO",
-        new BeamSqlUpperExpression(operands).evaluate(record).getValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java
deleted file mode 100644
index 7b8d9a4..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamIntersectRel}.
- */
-public class BeamIntersectRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-
-  @BeforeClass
-  public static void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS1",
-        MockedBoundedTable.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 1, 1.0,
-            1L, 1, 1.0,
-            2L, 2, 2.0,
-            4L, 4, 4.0
-        )
-    );
-
-    sqlEnv.registerTable("ORDER_DETAILS2",
-        MockedBoundedTable.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 1, 1.0,
-            2L, 2, 2.0,
-            3L, 3, 3.0
-        )
-    );
-  }
-
-  @Test
-  public void testIntersect() throws Exception {
-    String sql = "";
-    sql += "SELECT order_id, site_id, price "
-        + "FROM ORDER_DETAILS1 "
-        + " INTERSECT "
-        + "SELECT order_id, site_id, price "
-        + "FROM ORDER_DETAILS2 ";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 1, 1.0,
-            2L, 2, 2.0
-        ).getRows());
-
-    pipeline.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testIntersectAll() throws Exception {
-    String sql = "";
-    sql += "SELECT order_id, site_id, price "
-        + "FROM ORDER_DETAILS1 "
-        + " INTERSECT ALL "
-        + "SELECT order_id, site_id, price "
-        + "FROM ORDER_DETAILS2 ";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).satisfies(new CheckSize(3));
-
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 1, 1.0,
-            1L, 1, 1.0,
-            2L, 2, 2.0
-        ).getRows());
-
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
deleted file mode 100644
index 2acee82..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Bounded + Bounded Test for {@code BeamJoinRel}.
- */
-public class BeamJoinRelBoundedVsBoundedTest {
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
-
-  public static final MockedBoundedTable ORDER_DETAILS1 =
-      MockedBoundedTable.of(
-          Types.INTEGER, "order_id",
-          Types.INTEGER, "site_id",
-          Types.INTEGER, "price"
-      ).addRows(
-          1, 2, 3,
-          2, 3, 3,
-          3, 4, 5
-      );
-
-  public static final MockedBoundedTable ORDER_DETAILS2 =
-      MockedBoundedTable.of(
-          Types.INTEGER, "order_id",
-          Types.INTEGER, "site_id",
-          Types.INTEGER, "price"
-      ).addRows(
-          1, 2, 3,
-          2, 3, 3,
-          3, 4, 5
-      );
-
-  @BeforeClass
-  public static void prepare() {
-    beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
-    beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
-  }
-
-  @Test
-  public void testInnerJoin() throws Exception {
-    String sql =
-        "SELECT *  "
-        + "FROM ORDER_DETAILS1 o1"
-        + " JOIN ORDER_DETAILS2 o2"
-        + " on "
-        + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
-        ;
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.INTEGER, "order_id",
-            Types.INTEGER, "site_id",
-            Types.INTEGER, "price",
-            Types.INTEGER, "order_id0",
-            Types.INTEGER, "site_id0",
-            Types.INTEGER, "price0"
-        ).addRows(
-            2, 3, 3, 1, 2, 3
-        ).getRows());
-    pipeline.run();
-  }
-
-  @Test
-  public void testLeftOuterJoin() throws Exception {
-    String sql =
-        "SELECT *  "
-            + "FROM ORDER_DETAILS1 o1"
-            + " LEFT OUTER JOIN ORDER_DETAILS2 o2"
-            + " on "
-            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
-        ;
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    pipeline.enableAbandonedNodeEnforcement(false);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.INTEGER, "order_id",
-            Types.INTEGER, "site_id",
-            Types.INTEGER, "price",
-            Types.INTEGER, "order_id0",
-            Types.INTEGER, "site_id0",
-            Types.INTEGER, "price0"
-        ).addRows(
-            1, 2, 3, null, null, null,
-            2, 3, 3, 1, 2, 3,
-            3, 4, 5, null, null, null
-        ).getRows());
-    pipeline.run();
-  }
-
-  @Test
-  public void testRightOuterJoin() throws Exception {
-    String sql =
-        "SELECT *  "
-            + "FROM ORDER_DETAILS1 o1"
-            + " RIGHT OUTER JOIN ORDER_DETAILS2 o2"
-            + " on "
-            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
-        ;
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.INTEGER, "order_id",
-            Types.INTEGER, "site_id",
-            Types.INTEGER, "price",
-            Types.INTEGER, "order_id0",
-            Types.INTEGER, "site_id0",
-            Types.INTEGER, "price0"
-        ).addRows(
-            2, 3, 3, 1, 2, 3,
-            null, null, null, 2, 3, 3,
-            null, null, null, 3, 4, 5
-        ).getRows());
-    pipeline.run();
-  }
-
-  @Test
-  public void testFullOuterJoin() throws Exception {
-    String sql =
-        "SELECT *  "
-            + "FROM ORDER_DETAILS1 o1"
-            + " FULL OUTER JOIN ORDER_DETAILS2 o2"
-            + " on "
-            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
-        ;
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-          Types.INTEGER, "order_id",
-          Types.INTEGER, "site_id",
-          Types.INTEGER, "price",
-          Types.INTEGER, "order_id0",
-          Types.INTEGER, "site_id0",
-          Types.INTEGER, "price0"
-        ).addRows(
-          2, 3, 3, 1, 2, 3,
-          1, 2, 3, null, null, null,
-          3, 4, 5, null, null, null,
-          null, null, null, 2, 3, 3,
-          null, null, null, 3, 4, 5
-        ).getRows());
-    pipeline.run();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testException_nonEqualJoin() throws Exception {
-    String sql =
-        "SELECT *  "
-            + "FROM ORDER_DETAILS1 o1"
-            + " JOIN ORDER_DETAILS2 o2"
-            + " on "
-            + " o1.order_id>o2.site_id"
-        ;
-
-    pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    pipeline.run();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testException_crossJoin() throws Exception {
-    String sql =
-        "SELECT *  "
-            + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
-
-    pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
deleted file mode 100644
index e226b70..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.sql.Types;
-import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.transform.BeamSqlOutputToConsoleFn;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Unbounded + Unbounded Test for {@code BeamJoinRel}.
- */
-public class BeamJoinRelUnboundedVsBoundedTest {
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
-  public static final Date FIRST_DATE = new Date(1);
-  public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
-  public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1);
-  private static final Duration WINDOW_SIZE = Duration.standardHours(1);
-
-  @BeforeClass
-  public static void prepare() {
-    beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
-        .of(
-            Types.INTEGER, "order_id",
-            Types.INTEGER, "site_id",
-            Types.INTEGER, "price",
-            Types.TIMESTAMP, "order_time"
-        )
-        .timestampColumnIndex(3)
-        .addRows(
-            Duration.ZERO,
-            1, 1, 1, FIRST_DATE,
-            1, 2, 2, FIRST_DATE
-        )
-        .addRows(
-            WINDOW_SIZE.plus(Duration.standardSeconds(1)),
-            2, 2, 3, SECOND_DATE,
-            2, 3, 3, SECOND_DATE,
-            // this late data is omitted
-            1, 2, 3, FIRST_DATE
-        )
-        .addRows(
-            WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)),
-            3, 3, 3, THIRD_DATE,
-            // this late data is omitted
-            2, 2, 3, SECOND_DATE
-        )
-    );
-
-    beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable
-        .of(Types.INTEGER, "order_id",
-            Types.VARCHAR, "buyer"
-        ).addRows(
-            1, "james",
-            2, "bond"
-        ));
-  }
-
-  @Test
-  public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception {
-    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " JOIN "
-        + " ORDER_DETAILS1 o2 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                Types.INTEGER, "order_id",
-                Types.INTEGER, "sum_site_id",
-                Types.VARCHAR, "buyer"
-            ).addRows(
-                1, 3, "james",
-                2, 5, "bond"
-            ).getStringRows()
-        );
-    pipeline.run();
-  }
-
-  @Test
-  public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception {
-    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
-        + " ORDER_DETAILS1 o2 "
-        + " JOIN "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                Types.INTEGER, "order_id",
-                Types.INTEGER, "sum_site_id",
-                Types.VARCHAR, "buyer"
-            ).addRows(
-                1, 3, "james",
-                2, 5, "bond"
-            ).getStringRows()
-        );
-    pipeline.run();
-  }
-
-  @Test
-  public void testLeftOuterJoin() throws Exception {
-    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " LEFT OUTER JOIN "
-        + " ORDER_DETAILS1 o2 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                Types.INTEGER, "order_id",
-                Types.INTEGER, "sum_site_id",
-                Types.VARCHAR, "buyer"
-            ).addRows(
-                1, 3, "james",
-                2, 5, "bond",
-                3, 3, null
-            ).getStringRows()
-        );
-    pipeline.run();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testLeftOuterJoinError() throws Exception {
-    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
-        + " ORDER_DETAILS1 o2 "
-        + " LEFT OUTER JOIN "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-    pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    pipeline.run();
-  }
-
-  @Test
-  public void testRightOuterJoin() throws Exception {
-    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
-        + " ORDER_DETAILS1 o2 "
-        + " RIGHT OUTER JOIN "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                Types.INTEGER, "order_id",
-                Types.INTEGER, "sum_site_id",
-                Types.VARCHAR, "buyer"
-            ).addRows(
-                1, 3, "james",
-                2, 5, "bond",
-                3, 3, null
-            ).getStringRows()
-        );
-    pipeline.run();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testRightOuterJoinError() throws Exception {
-    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " RIGHT OUTER JOIN "
-        + " ORDER_DETAILS1 o2 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-
-    pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    pipeline.run();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testFullOuterJoinError() throws Exception {
-    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
-        + " ORDER_DETAILS1 o2 "
-        + " FULL OUTER JOIN "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-    pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
deleted file mode 100644
index c366a6e..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.sql.Types;
-import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.transform.BeamSqlOutputToConsoleFn;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Unbounded + Unbounded Test for {@code BeamJoinRel}.
- */
-public class BeamJoinRelUnboundedVsUnboundedTest {
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
-  public static final Date FIRST_DATE = new Date(1);
-  public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
-
-  private static final Duration WINDOW_SIZE = Duration.standardHours(1);
-
-  @BeforeClass
-  public static void prepare() {
-    beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
-        .of(Types.INTEGER, "order_id",
-            Types.INTEGER, "site_id",
-            Types.INTEGER, "price",
-            Types.TIMESTAMP, "order_time"
-        )
-        .timestampColumnIndex(3)
-        .addRows(
-            Duration.ZERO,
-            1, 1, 1, FIRST_DATE,
-            1, 2, 6, FIRST_DATE
-        )
-        .addRows(
-            WINDOW_SIZE.plus(Duration.standardMinutes(1)),
-            2, 2, 7, SECOND_DATE,
-            2, 3, 8, SECOND_DATE,
-            // this late record is omitted(First window)
-            1, 3, 3, FIRST_DATE
-        )
-        .addRows(
-            // this late record is omitted(Second window)
-            WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)),
-            2, 3, 3, SECOND_DATE
-        )
-    );
-  }
-
-  @Test
-  public void testInnerJoin() throws Exception {
-    String sql = "SELECT * FROM "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " JOIN "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                Types.INTEGER, "order_id",
-                Types.INTEGER, "sum_site_id",
-                Types.INTEGER, "order_id0",
-                Types.INTEGER, "sum_site_id0").addRows(
-                1, 3, 1, 3,
-                2, 5, 2, 5
-            ).getStringRows()
-        );
-    pipeline.run();
-  }
-
-  @Test
-  public void testLeftOuterJoin() throws Exception {
-    String sql = "SELECT * FROM "
-        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " LEFT OUTER JOIN "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-
-    // 1, 1 | 1, 3
-    // 2, 2 | NULL, NULL
-    // ---- | -----
-    // 2, 2 | 2, 5
-    // 3, 3 | NULL, NULL
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                Types.INTEGER, "order_id",
-                Types.INTEGER, "sum_site_id",
-                Types.INTEGER, "order_id0",
-                Types.INTEGER, "sum_site_id0"
-            ).addRows(
-                1, 1, 1, 3,
-                2, 2, null, null,
-                2, 2, 2, 5,
-                3, 3, null, null
-            ).getStringRows()
-        );
-    pipeline.run();
-  }
-
-  @Test
-  public void testRightOuterJoin() throws Exception {
-    String sql = "SELECT * FROM "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " RIGHT OUTER JOIN "
-        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                Types.INTEGER, "order_id",
-                Types.INTEGER, "sum_site_id",
-                Types.INTEGER, "order_id0",
-                Types.INTEGER, "sum_site_id0"
-            ).addRows(
-                1, 3, 1, 1,
-                null, null, 2, 2,
-                2, 5, 2, 2,
-                null, null, 3, 3
-            ).getStringRows()
-        );
-    pipeline.run();
-  }
-
-  @Test
-  public void testFullOuterJoin() throws Exception {
-    String sql = "SELECT * FROM "
-        + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
-        + " FULL OUTER JOIN "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
-        + " on "
-        + " o1.order_id1=o2.order_id"
-        ;
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                Types.INTEGER, "order_id1",
-                Types.INTEGER, "sum_site_id",
-                Types.INTEGER, "order_id",
-                Types.INTEGER, "sum_site_id0"
-            ).addRows(
-                1, 1, 1, 3,
-                6, 2, null, null,
-                7, 2, null, null,
-                8, 3, null, null,
-                null, null, 2, 5
-            ).getStringRows()
-        );
-    pipeline.run();
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testWindowsMismatch() throws Exception {
-    String sql = "SELECT * FROM "
-        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 "
-        + " LEFT OUTER JOIN "
-        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
-        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
-        + " on "
-        + " o1.order_id=o2.order_id"
-        ;
-    pipeline.enableAbandonedNodeEnforcement(false);
-    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java
deleted file mode 100644
index f2ed132..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamMinusRel}.
- */
-public class BeamMinusRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-
-  @BeforeClass
-  public static void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS1",
-        MockedBoundedTable.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 1, 1.0,
-            1L, 1, 1.0,
-            2L, 2, 2.0,
-            4L, 4, 4.0,
-            4L, 4, 4.0
-        )
-    );
-
-    sqlEnv.registerTable("ORDER_DETAILS2",
-        MockedBoundedTable.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 1, 1.0,
-            2L, 2, 2.0,
-            3L, 3, 3.0
-        )
-    );
-  }
-
-  @Test
-  public void testExcept() throws Exception {
-    String sql = "";
-    sql += "SELECT order_id, site_id, price "
-        + "FROM ORDER_DETAILS1 "
-        + " EXCEPT "
-        + "SELECT order_id, site_id, price "
-        + "FROM ORDER_DETAILS2 ";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            4L, 4, 4.0
-        ).getRows());
-
-    pipeline.run();
-  }
-
-  @Test
-  public void testExceptAll() throws Exception {
-    String sql = "";
-    sql += "SELECT order_id, site_id, price "
-        + "FROM ORDER_DETAILS1 "
-        + " EXCEPT ALL "
-        + "SELECT order_id, site_id, price "
-        + "FROM ORDER_DETAILS2 ";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).satisfies(new CheckSize(2));
-
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            4L, 4, 4.0,
-            4L, 4, 4.0
-        ).getRows());
-
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java
deleted file mode 100644
index 65dd8af2..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.sql.Types;
-import java.util.Date;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamSetOperatorRelBase}.
- */
-public class BeamSetOperatorRelBaseTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-  public static final Date THE_DATE = new Date(100000);
-
-  @BeforeClass
-  public static void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS",
-        MockedBoundedTable.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price",
-            Types.TIMESTAMP, "order_time"
-        ).addRows(
-            1L, 1, 1.0, THE_DATE,
-            2L, 2, 2.0, THE_DATE
-        )
-    );
-  }
-
-  @Test
-  public void testSameWindow() throws Exception {
-    String sql = "SELECT "
-        + " order_id, site_id, count(*) as cnt "
-        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
-        + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
-        + " UNION SELECT "
-        + " order_id, site_id, count(*) as cnt "
-        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
-        + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    // compare valueInString to ignore the windowStart & windowEnd
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                Types.BIGINT, "order_id",
-                Types.INTEGER, "site_id",
-                Types.BIGINT, "cnt"
-            ).addRows(
-                1L, 1, 1L,
-                2L, 2, 1L
-            ).getStringRows());
-    pipeline.run();
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testDifferentWindows() throws Exception {
-    String sql = "SELECT "
-        + " order_id, site_id, count(*) as cnt "
-        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
-        + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
-        + " UNION SELECT "
-        + " order_id, site_id, count(*) as cnt "
-        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
-        + ", TUMBLE(order_time, INTERVAL '2' HOUR) ";
-
-    // use a real pipeline rather than the TestPipeline because we are
-    // testing exceptions, the pipeline will not actually run.
-    Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
-    BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv);
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java
deleted file mode 100644
index 9e38bb6..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.sql.Types;
-import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamSortRel}.
- */
-public class BeamSortRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-
-  @Before
-  public void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS",
-        MockedBoundedTable.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price",
-            Types.TIMESTAMP, "order_time"
-        ).addRows(
-            1L, 2, 1.0, new Date(),
-            1L, 1, 2.0, new Date(),
-            2L, 4, 3.0, new Date(),
-            2L, 1, 4.0, new Date(),
-            5L, 5, 5.0, new Date(),
-            6L, 6, 6.0, new Date(),
-            7L, 7, 7.0, new Date(),
-            8L, 8888, 8.0, new Date(),
-            8L, 999, 9.0, new Date(),
-            10L, 100, 10.0, new Date()
-        )
-    );
-    sqlEnv.registerTable("SUB_ORDER_RAM",
-        MockedBoundedTable.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        )
-    );
-  }
-
-  @Test
-  public void testOrderBy_basic() throws Exception {
-    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
-        + " order_id, site_id, price "
-        + "FROM ORDER_DETAILS "
-        + "ORDER BY order_id asc, site_id desc limit 4";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
-        Types.BIGINT, "order_id",
-        Types.INTEGER, "site_id",
-        Types.DOUBLE, "price"
-    ).addRows(
-        1L, 2, 1.0,
-        1L, 1, 2.0,
-        2L, 4, 3.0,
-        2L, 1, 4.0
-    ).getRows());
-    pipeline.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testOrderBy_nullsFirst() throws Exception {
-    sqlEnv.registerTable("ORDER_DETAILS",
-        MockedBoundedTable.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 2, 1.0,
-            1L, null, 2.0,
-            2L, 1, 3.0,
-            2L, null, 4.0,
-            5L, 5, 5.0
-        )
-    );
-    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
-        .of(Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"));
-
-    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
-        + " order_id, site_id, price "
-        + "FROM ORDER_DETAILS "
-        + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, null, 2.0,
-            1L, 2, 1.0,
-            2L, null, 4.0,
-            2L, 1, 3.0
-        ).getRows()
-    );
-    pipeline.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testOrderBy_nullsLast() throws Exception {
-    sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable
-        .of(Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 2, 1.0,
-            1L, null, 2.0,
-            2L, 1, 3.0,
-            2L, null, 4.0,
-            5L, 5, 5.0));
-    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
-        .of(Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"));
-
-    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
-        + " order_id, site_id, price "
-        + "FROM ORDER_DETAILS "
-        + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 2, 1.0,
-            1L, null, 2.0,
-            2L, 1, 3.0,
-            2L, null, 4.0
-        ).getRows()
-    );
-    pipeline.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testOrderBy_with_offset() throws Exception {
-    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
-        + " order_id, site_id, price "
-        + "FROM ORDER_DETAILS "
-        + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            5L, 5, 5.0,
-            6L, 6, 6.0,
-            7L, 7, 7.0,
-            8L, 8888, 8.0
-        ).getRows()
-    );
-    pipeline.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testOrderBy_bigFetch() throws Exception {
-    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
-        + " order_id, site_id, price "
-        + "FROM ORDER_DETAILS "
-        + "ORDER BY order_id asc, site_id desc limit 11";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 2, 1.0,
-            1L, 1, 2.0,
-            2L, 4, 3.0,
-            2L, 1, 4.0,
-            5L, 5, 5.0,
-            6L, 6, 6.0,
-            7L, 7, 7.0,
-            8L, 8888, 8.0,
-            8L, 999, 9.0,
-            10L, 100, 10.0
-        ).getRows()
-    );
-    pipeline.run().waitUntilFinish();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testOrderBy_exception() throws Exception {
-    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id)  SELECT "
-        + " order_id, COUNT(*) "
-        + "FROM ORDER_DETAILS "
-        + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
-        + "ORDER BY order_id asc limit 11";
-
-    TestPipeline pipeline = TestPipeline.create();
-    BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java
deleted file mode 100644
index 54524df..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamUnionRel}.
- */
-public class BeamUnionRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-
-  @BeforeClass
-  public static void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS",
-        MockedBoundedTable.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 1, 1.0,
-            2L, 2, 2.0
-        )
-    );
-  }
-
-  @Test
-  public void testUnion() throws Exception {
-    String sql = "SELECT "
-        + " order_id, site_id, price "
-        + "FROM ORDER_DETAILS "
-        + " UNION SELECT "
-        + " order_id, site_id, price "
-        + "FROM ORDER_DETAILS ";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 1, 1.0,
-            2L, 2, 2.0
-        ).getRows()
-    );
-    pipeline.run();
-  }
-
-  @Test
-  public void testUnionAll() throws Exception {
-    String sql = "SELECT "
-        + " order_id, site_id, price "
-        + "FROM ORDER_DETAILS"
-        + " UNION ALL "
-        + " SELECT order_id, site_id, price "
-        + "FROM ORDER_DETAILS";
-
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.BIGINT, "order_id",
-            Types.INTEGER, "site_id",
-            Types.DOUBLE, "price"
-        ).addRows(
-            1L, 1, 1.0,
-            1L, 1, 1.0,
-            2L, 2, 2.0,
-            2L, 2, 2.0
-        ).getRows()
-    );
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRelTest.java
deleted file mode 100644
index ace1a3e..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRelTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamValuesRel}.
- */
-public class BeamValuesRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-
-  @BeforeClass
-  public static void prepare() {
-    sqlEnv.registerTable("string_table",
-        MockedBoundedTable.of(
-            Types.VARCHAR, "name",
-            Types.VARCHAR, "description"
-        )
-    );
-    sqlEnv.registerTable("int_table",
-        MockedBoundedTable.of(
-            Types.INTEGER, "c0",
-            Types.INTEGER, "c1"
-        )
-    );
-  }
-
-  @Test
-  public void testValues() throws Exception {
-    String sql = "insert into string_table(name, description) values "
-        + "('hello', 'world'), ('james', 'bond')";
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.VARCHAR, "name",
-            Types.VARCHAR, "description"
-        ).addRows(
-            "hello", "world",
-            "james", "bond"
-        ).getRows()
-    );
-    pipeline.run();
-  }
-
-  @Test
-  public void testValues_castInt() throws Exception {
-    String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.INTEGER, "c0",
-            Types.INTEGER, "c1"
-        ).addRows(
-            1, 2
-        ).getRows()
-    );
-    pipeline.run();
-  }
-
-  @Test
-  public void testValues_onlySelect() throws Exception {
-    String sql = "select 1, '1'";
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(
-        TestUtils.RowsBuilder.of(
-            Types.INTEGER, "EXPR$0",
-            Types.CHAR, "EXPR$1"
-        ).addRows(
-            1, "1"
-        ).getRows()
-    );
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/CheckSize.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/CheckSize.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/CheckSize.java
deleted file mode 100644
index f369076..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/CheckSize.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.junit.Assert;
-
-/**
- * Utility class to check size of BeamSQLRow iterable.
- */
-public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
-  private int size;
-  public CheckSize(int size) {
-    this.size = size;
-  }
-  @Override public Void apply(Iterable<BeamSqlRow> input) {
-    int count = 0;
-    for (BeamSqlRow row : input) {
-      count++;
-    }
-    Assert.assertEquals(size, count);
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
index 553420b..ddff819 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.schema;
 import java.math.BigDecimal;
 import java.util.Date;
 import java.util.GregorianCalendar;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataType;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
index 4eccc44..05af36c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
@@ -19,10 +19,10 @@
 package org.apache.beam.sdk.extensions.sql.schema.kafka;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
index 9dc599f..79e3d6d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
@@ -31,10 +31,10 @@ import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
index 571c8ef..821abc9 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
@@ -23,12 +23,12 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;