You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/05/24 16:27:06 UTC

flink git commit: [FLINK-9420] [table] Add tests and docs for SQL IN sub-query operator in streaming

Repository: flink
Updated Branches:
  refs/heads/master 88987ebb9 -> c47cc87e5


[FLINK-9420] [table] Add tests and docs for SQL IN sub-query operator in streaming

This closes #6065.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c47cc87e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c47cc87e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c47cc87e

Branch: refs/heads/master
Commit: c47cc87e507f6cb9276c8414c32114ef512cf917
Parents: 88987eb
Author: Xpray <le...@gmail.com>
Authored: Thu May 24 17:44:49 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Thu May 24 18:26:10 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  41 ++++-
 docs/dev/table/tableApi.md                      |  19 +-
 .../flink/table/expressions/subquery.scala      |   4 -
 .../table/api/stream/sql/SetOperatorsTest.scala | 173 +++++++++++++++++++
 .../api/stream/table/SetOperatorsTest.scala     | 134 ++++++++++++++
 .../UnsupportedOpsValidationTest.scala          |   7 -
 .../runtime/stream/sql/SetOperatorsITCase.scala | 170 ++++++++++++++++++
 .../stream/table/SetOperatorsITCase.scala       | 121 +++++++++++++
 8 files changed, 642 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 470026e..3d6f070 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -388,7 +388,7 @@ GROUP BY users
 SELECT *
 FROM Orders INNER JOIN Product ON Orders.productId = Product.id
 {% endhighlight %}
-<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -410,6 +410,7 @@ FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
 SELECT *
 FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
 {% endhighlight %}
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
     <tr>
@@ -458,19 +459,19 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
     	<td>
-      <p>UDTFs must be registered in the TableEnvironment. See the <a href="udfs.html">UDF documentation</a> for details on how to specify and register UDTFs. </p>
-      <p>Inner Join</p>
+        <p>UDTFs must be registered in the TableEnvironment. See the <a href="udfs.html">UDF documentation</a> for details on how to specify and register UDTFs. </p>
+        <p>Inner Join</p>
 {% highlight sql %}
 SELECT users, tag
 FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag
 {% endhighlight %}
-      <p>Left Outer Join</p>
+        <p>Left Outer Join</p>
 {% highlight sql %}
 SELECT users, tag
 FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
 {% endhighlight %}
 
-<p><b>Note:</b> Currently, only literal <code>TRUE</code> is supported as predicate for a left outer join against a lateral table.</p>
+        <p><b>Note:</b> Currently, only literal <code>TRUE</code> is supported as predicate for a left outer join against a lateral table.</p>
       </td>
     </tr>
   </tbody>
@@ -551,10 +552,10 @@ FROM (
     <tr>
       <td>
         <strong>In</strong><br>
-        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-      Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.
+        <p>Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>
 {% highlight sql %}
 SELECT user, amount
 FROM Orders
@@ -562,6 +563,25 @@ WHERE product IN (
     SELECT product FROM NewProducts
 )
 {% endhighlight %}
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Exists</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Returns true if the sub-query returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p>
+{% highlight sql %}
+SELECT user, amount
+FROM Orders
+WHERE product EXISTS (
+    SELECT product FROM NewProducts
+)
+{% endhighlight %}
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
   </tbody>
@@ -1083,6 +1103,7 @@ EXISTS (sub-query)
       </td>
       <td>
         <p>Returns TRUE if <i>sub-query</i> returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
 
@@ -1093,7 +1114,8 @@ value IN (sub-query)
 {% endhighlight %}
       </td>
       <td>
-        <p>Returns TRUE if <i>value</i> is equal to a row returned by sub-query. This operation is not supported in a streaming environment yet.</p>
+        <p>Returns TRUE if <i>value</i> is equal to a row returned by sub-query.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
 
@@ -1104,7 +1126,8 @@ value NOT IN (sub-query)
 {% endhighlight %}
       </td>
       <td>
-        <p>Returns TRUE if <i>value</i> is not equal to every row returned by sub-query. This operation is not supported in a streaming environment yet.</p>
+        <p>Returns TRUE if <i>value</i> is not equal to every row returned by sub-query.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 794d3b8..a7b9522 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -817,7 +817,7 @@ Table result = left.minusAll(right);
     <tr>
       <td>
         <strong>In</strong><br>
-        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
         <p>Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>
@@ -832,6 +832,8 @@ Table result = left.select("a, b, c").where("a.in(" + right + ")");
 tableEnv.registerTable("RightTable", right);
 Table result = left.select("a, b, c").where("a.in(RightTable)");
 {% endhighlight %}
+
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
   </tbody>
@@ -942,7 +944,7 @@ val result = left.minusAll(right)
     <tr>
       <td>
         <strong>In</strong><br>
-        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
         <p>Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>
@@ -951,6 +953,7 @@ val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
 val right = ds2.toTable(tableEnv, 'a)
 val result = left.select('a, 'b, 'c).where('a.in(right))
 {% endhighlight %}
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
 
@@ -1793,14 +1796,15 @@ ANY.in(TABLE)
 {% endhighlight %}
       </td>
       <td>
-        <p>Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression. Note: This operation is not supported in a streaming environment yet.</p>
+        <p>Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
 
     <tr>
       <td>
         {% highlight java %}
-    ANY.between(lowerBound, upperBound)
+ANY.between(lowerBound, upperBound)
     {% endhighlight %}
       </td>
       <td>
@@ -1812,7 +1816,7 @@ ANY.in(TABLE)
     <tr>
       <td>
         {% highlight java %}
-      ANY.notBetween(lowerBound, upperBound)
+ANY.notBetween(lowerBound, upperBound)
     {% endhighlight %}
       </td>
       <td>
@@ -3358,13 +3362,14 @@ ANY.in(TABLE)
       </td>
       <td>
         <p>Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression. Note: This operation is not supported in a streaming environment yet.</p>
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
 
     <tr>
       <td>
         {% highlight scala %}
-    ANY.between(lowerBound, upperBound)
+ANY.between(lowerBound, upperBound)
     {% endhighlight %}
       </td>
       <td>
@@ -3376,7 +3381,7 @@ ANY.in(TABLE)
     <tr>
       <td>
         {% highlight scala %}
-      ANY.notBetween(lowerBound, upperBound)
+ANY.notBetween(lowerBound, upperBound)
     {% endhighlight %}
       </td>
       <td>

http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala
index 6aef5c6..9352365 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala
@@ -54,10 +54,6 @@ case class In(expression: Expression, elements: Seq[Expression]) extends Express
         if (elements.length != 1) {
           return ValidationFailure("IN operator supports only one table reference.")
         }
-        if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) {
-          return ValidationFailure(
-            "Sub-query IN operator on stream tables is currently not supported.")
-        }
         val tableOutput = table.logicalPlan.output
         if (tableOutput.length > 1) {
           return ValidationFailure(

http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
new file mode 100644
index 0000000..266e3ff
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class SetOperatorsTest extends TableTestBase {
+
+  @Test
+  def testInUncorrelated(): Unit = {
+    val streamUtil = streamTestUtil()
+    streamUtil.addTable[(Int, Long, String)]("tableA", 'a, 'b, 'c)
+    streamUtil.addTable[(Int, String)]("tableB", 'x, 'y)
+
+    val sqlQuery =
+      s"""
+         |SELECT * FROM tableA
+         |WHERE a IN (SELECT x FROM tableB)
+       """.stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamJoin",
+          streamTableNode(0),
+          unaryNode(
+            "DataStreamGroupAggregate",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(1),
+              term("select", "x")
+            ),
+            term("groupBy", "x"),
+            term("select", "x")
+          ),
+          term("where", "=(a, x)"),
+          term("join", "a", "b", "c", "x"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b", "c")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testInUncorrelatedWithConditionAndAgg(): Unit = {
+    val streamUtil = streamTestUtil()
+    streamUtil.addTable[(Int, Long, String)]("tableA", 'a, 'b, 'c)
+    streamUtil.addTable[(Int, String)]("tableB", 'x, 'y)
+
+    val sqlQuery =
+      s"""
+         |SELECT * FROM tableA
+         |WHERE a IN (SELECT SUM(x) FROM tableB GROUP BY y HAVING y LIKE '%Hanoi%')
+       """.stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamJoin",
+          streamTableNode(0),
+          unaryNode(
+            "DataStreamGroupAggregate",
+            unaryNode(
+              "DataStreamCalc",
+              unaryNode(
+                "DataStreamGroupAggregate",
+                unaryNode(
+                  "DataStreamCalc",
+                  streamTableNode(1),
+                  term("select", "x", "y"),
+                  term("where", "LIKE(y, '%Hanoi%')")
+                ),
+                term("groupBy", "y"),
+                term("select", "y, SUM(x) AS EXPR$0")
+              ),
+              term("select", "EXPR$0")
+            ),
+            term("groupBy", "EXPR$0"),
+            term("select", "EXPR$0")
+          ),
+          term("where", "=(a, EXPR$0)"),
+          term("join", "a", "b", "c", "EXPR$0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b", "c")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testInWithMultiUncorrelatedCondition(): Unit = {
+    val streamUtil = streamTestUtil()
+    streamUtil.addTable[(Int, Long, String)]("tableA", 'a, 'b, 'c)
+    streamUtil.addTable[(Int, String)]("tableB", 'x, 'y)
+    streamUtil.addTable[(Long, Int)]("tableC", 'w, 'z)
+
+    val sqlQuery =
+      s"""
+         |SELECT * FROM tableA
+         |WHERE a IN (SELECT x FROM tableB)
+         |AND b IN (SELECT w FROM tableC)
+       """.stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamJoin",
+          unaryNode(
+            "DataStreamCalc",
+            binaryNode(
+              "DataStreamJoin",
+              streamTableNode(0),
+              unaryNode(
+                "DataStreamGroupAggregate",
+                unaryNode(
+                  "DataStreamCalc",
+                  streamTableNode(1),
+                  term("select", "x")
+                ),
+                term("groupBy", "x"),
+                term("select", "x")
+              ),
+              term("where", "=(a, x)"),
+              term("join", "a", "b", "c", "x"),
+              term("joinType", "InnerJoin")
+            ),
+            term("select", "a", "b", "c")
+          ),
+          unaryNode(
+            "DataStreamGroupAggregate",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(2),
+              term("select", "w")
+            ),
+            term("groupBy", "w"),
+            term("select", "w")
+          ),
+          term("where", "=(b, w)"),
+          term("join", "a", "b", "c", "w"),
+          term("joinType", "InnerJoin")
+      ),
+        term("select", "a", "b", "c")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
index c0fc05b..e84c630 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
@@ -94,4 +94,138 @@ class SetOperatorsTest extends TableTestBase {
     util.verifyTable(result, expected)
   }
 
+  @Test
+  def testInUncorrelated(): Unit = {
+    val streamUtil = streamTestUtil()
+    val tableA = streamUtil.addTable[(Int, Long, String)]('a, 'b, 'c)
+    val tableB = streamUtil.addTable[(Int, String)]('x, 'y)
+
+    val result = tableA.where('a.in(tableB.select('x)))
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamJoin",
+          streamTableNode(0),
+          unaryNode(
+            "DataStreamGroupAggregate",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(1),
+              term("select", "x")
+            ),
+            term("groupBy", "x"),
+            term("select", "x")
+          ),
+          term("where", "=(a, x)"),
+          term("join", "a", "b", "c", "x"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b", "c")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testInUncorrelatedWithConditionAndAgg(): Unit = {
+    val streamUtil = streamTestUtil()
+    val tableA = streamUtil.addTable[(Int, Long, String)]("tableA", 'a, 'b, 'c)
+    val tableB = streamUtil.addTable[(Int, String)]("tableB", 'x, 'y)
+
+    val result = tableA
+      .where('a.in(tableB.where('y.like("%Hanoi%")).groupBy('y).select('x.sum)))
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamJoin",
+          streamTableNode(0),
+          unaryNode(
+            "DataStreamGroupAggregate",
+            unaryNode(
+              "DataStreamCalc",
+              unaryNode(
+                "DataStreamGroupAggregate",
+                unaryNode(
+                  "DataStreamCalc",
+                  streamTableNode(1),
+                  term("select", "x", "y"),
+                  term("where", "LIKE(y, '%Hanoi%')")
+                ),
+                term("groupBy", "y"),
+                term("select", "y, SUM(x) AS TMP_0")
+              ),
+              term("select", "TMP_0")
+            ),
+            term("groupBy", "TMP_0"),
+            term("select", "TMP_0")
+          ),
+          term("where", "=(a, TMP_0)"),
+          term("join", "a", "b", "c", "TMP_0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b", "c")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testInWithMultiUncorrelatedCondition(): Unit = {
+    val streamUtil = streamTestUtil()
+    val tableA = streamUtil.addTable[(Int, Long, String)]("tableA", 'a, 'b, 'c)
+    val tableB = streamUtil.addTable[(Int, String)]("tableB", 'x, 'y)
+    val tableC = streamUtil.addTable[(Long, Int)]("tableC", 'w, 'z)
+
+    val result = tableA
+      .where('a.in(tableB.select('x)) && 'b.in(tableC.select('w)))
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamJoin",
+          unaryNode(
+            "DataStreamCalc",
+            binaryNode(
+              "DataStreamJoin",
+              streamTableNode(0),
+              unaryNode(
+                "DataStreamGroupAggregate",
+                unaryNode(
+                  "DataStreamCalc",
+                  streamTableNode(1),
+                  term("select", "x")
+                ),
+                term("groupBy", "x"),
+                term("select", "x")
+              ),
+              term("where", "=(a, x)"),
+              term("join", "a", "b", "c", "x"),
+              term("joinType", "InnerJoin")
+            ),
+            term("select", "a", "b", "c")
+          ),
+          unaryNode(
+            "DataStreamGroupAggregate",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(2),
+              term("select", "w")
+            ),
+            term("groupBy", "w"),
+            term("select", "w")
+          ),
+          term("where", "=(b, w)"),
+          term("join", "a", "b", "c", "w"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b", "c")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
index 1de2b1e..91bc780 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -104,11 +104,4 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
     t1.fetch(5)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testIn(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.select(0.in(t1.select('_1)))
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SetOperatorsITCase.scala
new file mode 100644
index 0000000..718fd3a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SetOperatorsITCase.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class SetOperatorsITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testInUncorrelatedWithConditionAndAgg(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery =
+      s"""
+         |SELECT * FROM tableA
+         |WHERE a IN (SELECT SUM(x) FROM tableB GROUP BY y HAVING y LIKE '%Hanoi%')
+       """.stripMargin
+
+    val dataA = Seq(
+      (1, 1L, "Hello"),
+      (2, 2L, "Hello"),
+      (3, 3L, "Hello World"),
+      (4, 4L, "Hello")
+    )
+
+    val dataB = Seq(
+      (1, "hello"),
+      (1, "Hanoi"),
+      (1, "Hanoi"),
+      (2, "Hanoi-1"),
+      (2, "Hanoi-1"),
+      (-1, "Hanoi-1")
+    )
+
+    tEnv.registerTable("tableA",
+      env.fromCollection(dataA).toTable(tEnv).as('a, 'b, 'c))
+
+    tEnv.registerTable("tableB",
+      env.fromCollection(dataB).toTable(tEnv).as('x, 'y))
+
+    val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+
+    val expected = Seq(
+      "2,2,Hello", "3,3,Hello World"
+    )
+
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInWithMultiUncorrelatedCondition(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery =
+      s"""
+         |SELECT * FROM tableA
+         |WHERE a IN (SELECT x FROM tableB)
+         |AND b IN (SELECT w FROM tableC)
+       """.stripMargin
+
+    val dataA = Seq(
+      (1, 1L, "Hello"),
+      (2, 2L, "Hello"),
+      (3, 3L, "Hello World"),
+      (4, 4L, "Hello")
+    )
+
+    val dataB = Seq(
+      (1, "hello"),
+      (2, "co-hello"),
+      (4, "hello")
+    )
+
+    val dataC = Seq(
+      (1L, "Joker"),
+      (1L, "Sanity"),
+      (2L, "Cool")
+    )
+
+    tEnv.registerTable("tableA",
+      env.fromCollection(dataA).toTable(tEnv).as('a, 'b, 'c))
+
+    tEnv.registerTable("tableB",
+      env.fromCollection(dataB).toTable(tEnv).as('x, 'y))
+
+    tEnv.registerTable("tableC",
+      env.fromCollection(dataC).toTable(tEnv).as('w, 'z))
+
+    val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+
+    val expected = Seq(
+      "1,1,Hello", "2,2,Hello"
+    )
+
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testNotInUncorrelated(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val sqlQuery =
+      s"""
+         |SELECT * FROM tableA
+         |WHERE a NOT IN (SELECT x FROM tableB)
+       """.stripMargin
+
+    val dataA = Seq(
+      (1, 1L, "Hello"),
+      (2, 2L, "Hello"),
+      (3, 3L, "Hello World"),
+      (4, 4L, "Hello")
+    )
+
+    val dataB = Seq(
+      (1, "hello"),
+      (2, "co-hello"),
+      (4, "hello")
+    )
+
+    tEnv.registerTable("tableA",
+      env.fromCollection(dataA).toTable(tEnv).as('a, 'b, 'c))
+
+    tEnv.registerTable("tableB",
+      env.fromCollection(dataB).toTable(tEnv).as('x, 'y))
+
+    val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+
+    val expected = Seq(
+      "3,3,Hello World"
+    )
+
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 479bce2..f01c191 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -107,4 +107,125 @@ class SetOperatorsITCase extends AbstractTestBase {
     val expected = mutable.MutableList("1,(1,a)", "2,(2,b)", "3,(3,c)", "4,(4,d)")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testInUncorrelated(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val dataA = Seq(
+      (1, 1L, "Hello"),
+      (2, 2L, "Hello"),
+      (3, 3L, "Hello World"),
+      (4, 4L, "Hello")
+    )
+
+    val dataB = Seq(
+      (1, "hello"),
+      (2, "co-hello"),
+      (4, "hello")
+    )
+
+    val tableA = env.fromCollection(dataA).toTable(tEnv, 'a, 'b, 'c)
+
+    val tableB = env.fromCollection(dataB).toTable(tEnv, 'x, 'y)
+
+    val result = tableA.where('a.in(tableB.select('x)))
+
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+
+    val expected = Seq(
+      "1,1,Hello", "2,2,Hello", "4,4,Hello"
+    )
+
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInUncorrelatedWithConditionAndAgg(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val dataA = Seq(
+      (1, 1L, "Hello"),
+      (2, 2L, "Hello"),
+      (3, 3L, "Hello World"),
+      (4, 4L, "Hello")
+    )
+
+    val dataB = Seq(
+      (1, "hello"),
+      (1, "Hanoi"),
+      (1, "Hanoi"),
+      (2, "Hanoi-1"),
+      (2, "Hanoi-1"),
+      (-1, "Hanoi-1")
+    )
+
+    val tableA = env.fromCollection(dataA).toTable(tEnv,'a, 'b, 'c)
+
+    val tableB = env.fromCollection(dataB).toTable(tEnv,'x, 'y)
+
+    val result = tableA
+      .where('a.in(tableB.where('y.like("%Hanoi%")).groupBy('y).select('x.sum)))
+
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+
+    val expected = Seq(
+      "2,2,Hello", "3,3,Hello World"
+    )
+
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInWithMultiUncorrelatedCondition(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val dataA = Seq(
+      (1, 1L, "Hello"),
+      (2, 2L, "Hello"),
+      (3, 3L, "Hello World"),
+      (4, 4L, "Hello")
+    )
+
+    val dataB = Seq(
+      (1, "hello"),
+      (2, "co-hello"),
+      (4, "hello")
+    )
+
+    val dataC = Seq(
+      (1L, "Joker"),
+      (1L, "Sanity"),
+      (2L, "Cool")
+    )
+
+    val tableA = env.fromCollection(dataA).toTable(tEnv,'a, 'b, 'c)
+
+    val tableB = env.fromCollection(dataB).toTable(tEnv,'x, 'y)
+
+    val tableC = env.fromCollection(dataC).toTable(tEnv,'w, 'z)
+
+    val result = tableA
+      .where('a.in(tableB.select('x)) && 'b.in(tableC.select('w)))
+
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+
+    val expected = Seq(
+      "1,1,Hello", "2,2,Hello"
+    )
+
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
 }