You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:46 UTC

[23/59] beam git commit: rename package org.apache.beam.dsls.sql to org.apache.beam.sdk.extensions.sql

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
new file mode 100644
index 0000000..2ca0a98
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlOverlayExpression.
+ */
+public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertTrue(new BeamSqlOverlayExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertTrue(new BeamSqlOverlayExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    Assert.assertEquals("w3resou3rce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
+    Assert.assertEquals("w3resou33rce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+    Assert.assertEquals("w3resou3rce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7));
+    Assert.assertEquals("w3resouce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
new file mode 100644
index 0000000..a8e3dd2
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlPositionExpression.
+ */
+public class BeamSqlPositionExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    assertTrue(new BeamSqlPositionExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertTrue(new BeamSqlPositionExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    assertFalse(new BeamSqlPositionExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertFalse(new BeamSqlPositionExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(-1, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
new file mode 100644
index 0000000..f23a18d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlStringUnaryExpression.
+ */
+public class BeamSqlStringUnaryExpressionTest {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertTrue(new BeamSqlCharLengthExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertFalse(new BeamSqlCharLengthExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertFalse(new BeamSqlCharLengthExpression(operands).accept());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
new file mode 100644
index 0000000..ea929a4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlSubstringExpression.
+ */
+public class BeamSqlSubstringExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertTrue(new BeamSqlSubstringExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertTrue(new BeamSqlSubstringExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals("hello",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertEquals("he",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+    assertEquals("hello",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100));
+    assertEquals("hello",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0));
+    assertEquals("",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
+    assertEquals("",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
+    assertEquals("o",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
new file mode 100644
index 0000000..8b2570e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlTrimExpression.
+ */
+public class BeamSqlTrimExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
+    assertTrue(new BeamSqlTrimExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.BOTH));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
+    assertTrue(new BeamSqlTrimExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
+    assertFalse(new BeamSqlTrimExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.LEADING));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
+    Assert.assertEquals("__hehe",
+        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.TRAILING));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
+    Assert.assertEquals("hehe__",
+        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.BOTH));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "__"));
+    Assert.assertEquals("__",
+        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
+    Assert.assertEquals("hello",
+        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void leadingTrim() throws Exception {
+    assertEquals("__hehe",
+        BeamSqlTrimExpression.leadingTrim("hehe__hehe", "he"));
+  }
+
+  @Test public void trailingTrim() throws Exception {
+    assertEquals("hehe__",
+        BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"));
+  }
+
+  @Test public void trim() throws Exception {
+    assertEquals("__",
+        BeamSqlTrimExpression.leadingTrim(
+        BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"), "he"
+        ));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
new file mode 100644
index 0000000..a225cd6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test of BeamSqlUpperExpression.
+ */
+public class BeamSqlUpperExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertEquals("HELLO",
+        new BeamSqlUpperExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
new file mode 100644
index 0000000..c7c26eb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.mock;
+
+import static org.apache.beam.sdk.extensions.sql.TestUtils.buildBeamSqlRowType;
+import static org.apache.beam.sdk.extensions.sql.TestUtils.buildRows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * Mocked table for bounded data sources.
+ */
+public class MockedBoundedTable extends MockedTable {
+  /** rows written to this table. */
+  private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
+  /** rows flow out from this table. */
+  private final List<BeamSqlRow> rows = new ArrayList<>();
+
+  public MockedBoundedTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
+  }
+
+  /**
+   * Convenient way to build a mocked bounded table.
+   *
+   * <p>e.g.
+   *
+   * <pre>{@code
+   * MockedUnboundedTable
+   *   .of(Types.BIGINT, "order_id",
+   *       Types.INTEGER, "site_id",
+   *       Types.DOUBLE, "price",
+   *       Types.TIMESTAMP, "order_time")
+   * }</pre>
+   */
+  public static MockedBoundedTable of(final Object... args){
+    return new MockedBoundedTable(buildBeamSqlRowType(args));
+  }
+
+  /**
+   * Build a mocked bounded table with the specified type.
+   */
+  public static MockedBoundedTable of(final BeamSqlRowType type) {
+    return new MockedBoundedTable(type);
+  }
+
+
+  /**
+   * Add rows to the builder.
+   *
+   * <p>Sample usage:
+   *
+   * <pre>{@code
+   * addRows(
+   *   1, 3, "james", -- first row
+   *   2, 5, "bond"   -- second row
+   *   ...
+   * )
+   * }</pre>
+   */
+  public MockedBoundedTable addRows(Object... args) {
+    List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
+    this.rows.addAll(rows);
+    return this;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.BOUNDED;
+  }
+
+  @Override
+  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+    return PBegin.in(pipeline).apply(
+        "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows));
+  }
+
+  @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+    return new OutputStore();
+  }
+
+  /**
+   * Keep output in {@code CONTENT} for validation.
+   *
+   */
+  public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> {
+
+    @Override
+    public PDone expand(PCollection<BeamSqlRow> input) {
+      input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          CONTENT.add(c.element());
+        }
+
+        @Teardown
+        public void close() {
+          CONTENT.clear();
+        }
+
+      }));
+      return PDone.in(input.getPipeline());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
new file mode 100644
index 0000000..6017ee7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.mock;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * Base class for mocked table.
+ */
+public abstract class MockedTable extends BaseBeamTable {
+  public static final AtomicInteger COUNTER = new AtomicInteger();
+  public MockedTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+    throw new UnsupportedOperationException("buildIOWriter unsupported!");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
new file mode 100644
index 0000000..f9ea2ac
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.mock;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.calcite.util.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A mocked unbounded table.
+ */
+public class MockedUnboundedTable extends MockedTable {
+  /** rows flow out from this table with the specified watermark instant. */
+  private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>();
+  /** specify the index of column in the row which stands for the event time field. */
+  private int timestampField;
+  private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
+  }
+
+  /**
+   * Convenient way to build a mocked unbounded table.
+   *
+   * <p>e.g.
+   *
+   * <pre>{@code
+   * MockedUnboundedTable
+   *   .of(Types.BIGINT, "order_id",
+   *       Types.INTEGER, "site_id",
+   *       Types.DOUBLE, "price",
+   *       Types.TIMESTAMP, "order_time")
+   * }</pre>
+   */
+  public static MockedUnboundedTable of(final Object... args){
+    return new MockedUnboundedTable(TestUtils.buildBeamSqlRowType(args));
+  }
+
+  public MockedUnboundedTable timestampColumnIndex(int idx) {
+    this.timestampField = idx;
+    return this;
+  }
+
+  /**
+   * Add rows to the builder.
+   *
+   * <p>Sample usage:
+   *
+   * <pre>{@code
+   * addRows(
+   *   duration,      -- duration which stands for the corresponding watermark instant
+   *   1, 3, "james", -- first row
+   *   2, 5, "bond"   -- second row
+   *   ...
+   * )
+   * }</pre>
+   */
+  public MockedUnboundedTable addRows(Duration duration, Object... args) {
+    List<BeamSqlRow> rows = TestUtils.buildRows(getRowType(), Arrays.asList(args));
+    // record the watermark + rows
+    this.timestampedRows.add(Pair.of(duration, rows));
+    return this;
+  }
+
+  @Override public BeamIOType getSourceType() {
+    return BeamIOType.UNBOUNDED;
+  }
+
+  @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+    TestStream.Builder<BeamSqlRow> values = TestStream.create(
+        new BeamSqlRowCoder(beamSqlRowType));
+
+    for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
+      values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
+      for (int i = 0; i < pair.getValue().size(); i++) {
+        values = values.addElements(TimestampedValue.of(pair.getValue().get(i),
+            new Instant(pair.getValue().get(i).getDate(timestampField))));
+      }
+    }
+
+    return pipeline.begin().apply(
+        "MockedUnboundedTable_" + COUNTER.incrementAndGet(),
+        values.advanceWatermarkToInfinity());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java
new file mode 100644
index 0000000..7b8d9a4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rel;
+
+import java.sql.Types;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamIntersectRel}.
+ */
+public class BeamIntersectRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS1",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            4L, 4, 4.0
+        )
+    );
+
+    sqlEnv.registerTable("ORDER_DETAILS2",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            3L, 3, 3.0
+        )
+    );
+  }
+
+  @Test
+  public void testIntersect() throws Exception {
+    String sql = "";
+    sql += "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS1 "
+        + " INTERSECT "
+        + "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS2 ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0
+        ).getRows());
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testIntersectAll() throws Exception {
+    String sql = "";
+    sql += "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS1 "
+        + " INTERSECT ALL "
+        + "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS2 ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).satisfies(new CheckSize(3));
+
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            1L, 1, 1.0,
+            2L, 2, 2.0
+        ).getRows());
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
new file mode 100644
index 0000000..2acee82
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rel;
+
+import java.sql.Types;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Bounded + Bounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelBoundedVsBoundedTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+
+  public static final MockedBoundedTable ORDER_DETAILS1 =
+      MockedBoundedTable.of(
+          Types.INTEGER, "order_id",
+          Types.INTEGER, "site_id",
+          Types.INTEGER, "price"
+      ).addRows(
+          1, 2, 3,
+          2, 3, 3,
+          3, 4, 5
+      );
+
+  public static final MockedBoundedTable ORDER_DETAILS2 =
+      MockedBoundedTable.of(
+          Types.INTEGER, "order_id",
+          Types.INTEGER, "site_id",
+          Types.INTEGER, "price"
+      ).addRows(
+          1, 2, 3,
+          2, 3, 3,
+          3, 4, 5
+      );
+
+  @BeforeClass
+  public static void prepare() {
+    beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
+    beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
+  }
+
+  @Test
+  public void testInnerJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+        + "FROM ORDER_DETAILS1 o1"
+        + " JOIN ORDER_DETAILS2 o2"
+        + " on "
+        + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.INTEGER, "order_id0",
+            Types.INTEGER, "site_id0",
+            Types.INTEGER, "price0"
+        ).addRows(
+            2, 3, 3, 1, 2, 3
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " LEFT OUTER JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.enableAbandonedNodeEnforcement(false);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.INTEGER, "order_id0",
+            Types.INTEGER, "site_id0",
+            Types.INTEGER, "price0"
+        ).addRows(
+            1, 2, 3, null, null, null,
+            2, 3, 3, 1, 2, 3,
+            3, 4, 5, null, null, null
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testRightOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " RIGHT OUTER JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.INTEGER, "order_id0",
+            Types.INTEGER, "site_id0",
+            Types.INTEGER, "price0"
+        ).addRows(
+            2, 3, 3, 1, 2, 3,
+            null, null, null, 2, 3, 3,
+            null, null, null, 3, 4, 5
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testFullOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " FULL OUTER JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+          Types.INTEGER, "order_id",
+          Types.INTEGER, "site_id",
+          Types.INTEGER, "price",
+          Types.INTEGER, "order_id0",
+          Types.INTEGER, "site_id0",
+          Types.INTEGER, "price0"
+        ).addRows(
+          2, 3, 3, 1, 2, 3,
+          1, 2, 3, null, null, null,
+          3, 4, 5, null, null, null,
+          null, null, null, 2, 3, 3,
+          null, null, null, 3, 4, 5
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testException_nonEqualJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id>o2.site_id"
+        ;
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testException_crossJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
new file mode 100644
index 0000000..e226b70
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.transform.BeamSqlOutputToConsoleFn;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unbounded + Unbounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelUnboundedVsBoundedTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+  public static final Date FIRST_DATE = new Date(1);
+  public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
+  public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1);
+  private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
+  @BeforeClass
+  public static void prepare() {
+    beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+        .of(
+            Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.TIMESTAMP, "order_time"
+        )
+        .timestampColumnIndex(3)
+        .addRows(
+            Duration.ZERO,
+            1, 1, 1, FIRST_DATE,
+            1, 2, 2, FIRST_DATE
+        )
+        .addRows(
+            WINDOW_SIZE.plus(Duration.standardSeconds(1)),
+            2, 2, 3, SECOND_DATE,
+            2, 3, 3, SECOND_DATE,
+            // this late data is omitted
+            1, 2, 3, FIRST_DATE
+        )
+        .addRows(
+            WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)),
+            3, 3, 3, THIRD_DATE,
+            // this late data is omitted
+            2, 2, 3, SECOND_DATE
+        )
+    );
+
+    beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable
+        .of(Types.INTEGER, "order_id",
+            Types.VARCHAR, "buyer"
+        ).addRows(
+            1, "james",
+            2, "bond"
+        ));
+  }
+
+  @Test
+  public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " JOIN "
+        + " ORDER_DETAILS1 o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).addRows(
+                1, 3, "james",
+                2, 5, "bond"
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).addRows(
+                1, 3, "james",
+                2, 5, "bond"
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " LEFT OUTER JOIN "
+        + " ORDER_DETAILS1 o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).addRows(
+                1, 3, "james",
+                2, 5, "bond",
+                3, 3, null
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testLeftOuterJoinError() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " LEFT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+
+  @Test
+  public void testRightOuterJoin() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " RIGHT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).addRows(
+                1, 3, "james",
+                2, 5, "bond",
+                3, 3, null
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testRightOuterJoinError() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " RIGHT OUTER JOIN "
+        + " ORDER_DETAILS1 o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testFullOuterJoinError() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " FULL OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
new file mode 100644
index 0000000..c366a6e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.transform.BeamSqlOutputToConsoleFn;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unbounded + Unbounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelUnboundedVsUnboundedTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+  public static final Date FIRST_DATE = new Date(1);
+  public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
+
+  private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
+  @BeforeClass
+  public static void prepare() {
+    beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+        .of(Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.TIMESTAMP, "order_time"
+        )
+        .timestampColumnIndex(3)
+        .addRows(
+            Duration.ZERO,
+            1, 1, 1, FIRST_DATE,
+            1, 2, 6, FIRST_DATE
+        )
+        .addRows(
+            WINDOW_SIZE.plus(Duration.standardMinutes(1)),
+            2, 2, 7, SECOND_DATE,
+            2, 3, 8, SECOND_DATE,
+            // this late record is omitted(First window)
+            1, 3, 3, FIRST_DATE
+        )
+        .addRows(
+            // this late record is omitted(Second window)
+            WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)),
+            2, 3, 3, SECOND_DATE
+        )
+    );
+  }
+
+  @Test
+  public void testInnerJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id0",
+                Types.INTEGER, "sum_site_id0").addRows(
+                1, 3, 1, 3,
+                2, 5, 2, 5
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " LEFT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    // 1, 1 | 1, 3
+    // 2, 2 | NULL, NULL
+    // ---- | -----
+    // 2, 2 | 2, 5
+    // 3, 3 | NULL, NULL
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id0",
+                Types.INTEGER, "sum_site_id0"
+            ).addRows(
+                1, 1, 1, 3,
+                2, 2, null, null,
+                2, 2, 2, 5,
+                3, 3, null, null
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testRightOuterJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " RIGHT OUTER JOIN "
+        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id0",
+                Types.INTEGER, "sum_site_id0"
+            ).addRows(
+                1, 3, 1, 1,
+                null, null, 2, 2,
+                2, 5, 2, 2,
+                null, null, 3, 3
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testFullOuterJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " FULL OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id1=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id1",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id0"
+            ).addRows(
+                1, 1, 1, 3,
+                6, 2, null, null,
+                7, 2, null, null,
+                8, 3, null, null,
+                null, null, 2, 5
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWindowsMismatch() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 "
+        + " LEFT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java
new file mode 100644
index 0000000..f2ed132
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rel;
+
+import java.sql.Types;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamMinusRel}.
+ */
+public class BeamMinusRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS1",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            4L, 4, 4.0,
+            4L, 4, 4.0
+        )
+    );
+
+    sqlEnv.registerTable("ORDER_DETAILS2",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            3L, 3, 3.0
+        )
+    );
+  }
+
+  @Test
+  public void testExcept() throws Exception {
+    String sql = "";
+    sql += "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS1 "
+        + " EXCEPT "
+        + "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS2 ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            4L, 4, 4.0
+        ).getRows());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testExceptAll() throws Exception {
+    String sql = "";
+    sql += "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS1 "
+        + " EXCEPT ALL "
+        + "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS2 ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).satisfies(new CheckSize(2));
+
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            4L, 4, 4.0,
+            4L, 4, 4.0
+        ).getRows());
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java
new file mode 100644
index 0000000..65dd8af2
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSetOperatorRelBase}.
+ */
+public class BeamSetOperatorRelBaseTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  public static final Date THE_DATE = new Date(100000);
+
+  @BeforeClass
+  public static void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price",
+            Types.TIMESTAMP, "order_time"
+        ).addRows(
+            1L, 1, 1.0, THE_DATE,
+            2L, 2, 2.0, THE_DATE
+        )
+    );
+  }
+
+  @Test
+  public void testSameWindow() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
+        + " UNION SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    // compare valueInString to ignore the windowStart & windowEnd
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.BIGINT, "order_id",
+                Types.INTEGER, "site_id",
+                Types.BIGINT, "cnt"
+            ).addRows(
+                1L, 1, 1L,
+                2L, 2, 1L
+            ).getStringRows());
+    pipeline.run();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDifferentWindows() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
+        + " UNION SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '2' HOUR) ";
+
+    // use a real pipeline rather than the TestPipeline because we are
+    // testing exceptions, the pipeline will not actually run.
+    Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
+    BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv);
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java
new file mode 100644
index 0000000..9e38bb6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSortRel}.
+ */
+public class BeamSortRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  @Before
+  public void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price",
+            Types.TIMESTAMP, "order_time"
+        ).addRows(
+            1L, 2, 1.0, new Date(),
+            1L, 1, 2.0, new Date(),
+            2L, 4, 3.0, new Date(),
+            2L, 1, 4.0, new Date(),
+            5L, 5, 5.0, new Date(),
+            6L, 6, 6.0, new Date(),
+            7L, 7, 7.0, new Date(),
+            8L, 8888, 8.0, new Date(),
+            8L, 999, 9.0, new Date(),
+            10L, 100, 10.0, new Date()
+        )
+    );
+    sqlEnv.registerTable("SUB_ORDER_RAM",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        )
+    );
+  }
+
+  @Test
+  public void testOrderBy_basic() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc limit 4";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
+        Types.BIGINT, "order_id",
+        Types.INTEGER, "site_id",
+        Types.DOUBLE, "price"
+    ).addRows(
+        1L, 2, 1.0,
+        1L, 1, 2.0,
+        2L, 4, 3.0,
+        2L, 1, 4.0
+    ).getRows());
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testOrderBy_nullsFirst() throws Exception {
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 2, 1.0,
+            1L, null, 2.0,
+            2L, 1, 3.0,
+            2L, null, 4.0,
+            5L, 5, 5.0
+        )
+    );
+    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
+        .of(Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"));
+
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, null, 2.0,
+            1L, 2, 1.0,
+            2L, null, 4.0,
+            2L, 1, 3.0
+        ).getRows()
+    );
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testOrderBy_nullsLast() throws Exception {
+    sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable
+        .of(Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 2, 1.0,
+            1L, null, 2.0,
+            2L, 1, 3.0,
+            2L, null, 4.0,
+            5L, 5, 5.0));
+    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
+        .of(Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"));
+
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 2, 1.0,
+            1L, null, 2.0,
+            2L, 1, 3.0,
+            2L, null, 4.0
+        ).getRows()
+    );
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testOrderBy_with_offset() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            5L, 5, 5.0,
+            6L, 6, 6.0,
+            7L, 7, 7.0,
+            8L, 8888, 8.0
+        ).getRows()
+    );
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testOrderBy_bigFetch() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + "ORDER BY order_id asc, site_id desc limit 11";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 2, 1.0,
+            1L, 1, 2.0,
+            2L, 4, 3.0,
+            2L, 1, 4.0,
+            5L, 5, 5.0,
+            6L, 6, 6.0,
+            7L, 7, 7.0,
+            8L, 8888, 8.0,
+            8L, 999, 9.0,
+            10L, 100, 10.0
+        ).getRows()
+    );
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testOrderBy_exception() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id)  SELECT "
+        + " order_id, COUNT(*) "
+        + "FROM ORDER_DETAILS "
+        + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
+        + "ORDER BY order_id asc limit 11";
+
+    TestPipeline pipeline = TestPipeline.create();
+    BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java
new file mode 100644
index 0000000..54524df
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rel;
+
+import java.sql.Types;
+import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
+import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamUnionRel}.
+ */
+public class BeamUnionRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0
+        )
+    );
+  }
+
+  @Test
+  public void testUnion() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + " UNION SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0
+        ).getRows()
+    );
+    pipeline.run();
+  }
+
+  @Test
+  public void testUnionAll() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS"
+        + " UNION ALL "
+        + " SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            2L, 2, 2.0
+        ).getRows()
+    );
+    pipeline.run();
+  }
+}