You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/05/24 16:27:06 UTC
flink git commit: [FLINK-9420] [table] Add tests and docs for SQL IN
sub-query operator in streaming
Repository: flink
Updated Branches:
refs/heads/master 88987ebb9 -> c47cc87e5
[FLINK-9420] [table] Add tests and docs for SQL IN sub-query operator in streaming
This closes #6065.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c47cc87e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c47cc87e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c47cc87e
Branch: refs/heads/master
Commit: c47cc87e507f6cb9276c8414c32114ef512cf917
Parents: 88987eb
Author: Xpray <le...@gmail.com>
Authored: Thu May 24 17:44:49 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Thu May 24 18:26:10 2018 +0200
----------------------------------------------------------------------
docs/dev/table/sql.md | 41 ++++-
docs/dev/table/tableApi.md | 19 +-
.../flink/table/expressions/subquery.scala | 4 -
.../table/api/stream/sql/SetOperatorsTest.scala | 173 +++++++++++++++++++
.../api/stream/table/SetOperatorsTest.scala | 134 ++++++++++++++
.../UnsupportedOpsValidationTest.scala | 7 -
.../runtime/stream/sql/SetOperatorsITCase.scala | 170 ++++++++++++++++++
.../stream/table/SetOperatorsITCase.scala | 121 +++++++++++++
8 files changed, 642 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 470026e..3d6f070 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -388,7 +388,7 @@ GROUP BY users
SELECT *
FROM Orders INNER JOIN Product ON Orders.productId = Product.id
{% endhighlight %}
-<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+ <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
<tr>
@@ -410,6 +410,7 @@ FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT *
FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
{% endhighlight %}
+ <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
<tr>
@@ -458,19 +459,19 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
<span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
- <p>UDTFs must be registered in the TableEnvironment. See the <a href="udfs.html">UDF documentation</a> for details on how to specify and register UDTFs. </p>
- <p>Inner Join</p>
+ <p>UDTFs must be registered in the TableEnvironment. See the <a href="udfs.html">UDF documentation</a> for details on how to specify and register UDTFs. </p>
+ <p>Inner Join</p>
{% highlight sql %}
SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag
{% endhighlight %}
- <p>Left Outer Join</p>
+ <p>Left Outer Join</p>
{% highlight sql %}
SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
{% endhighlight %}
-<p><b>Note:</b> Currently, only literal <code>TRUE</code> is supported as predicate for a left outer join against a lateral table.</p>
+ <p><b>Note:</b> Currently, only literal <code>TRUE</code> is supported as predicate for a left outer join against a lateral table.</p>
</td>
</tr>
</tbody>
@@ -551,10 +552,10 @@ FROM (
<tr>
<td>
<strong>In</strong><br>
- <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
- Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.
+ <p>Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>
{% highlight sql %}
SELECT user, amount
FROM Orders
@@ -562,6 +563,25 @@ WHERE product IN (
SELECT product FROM NewProducts
)
{% endhighlight %}
+ <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ <strong>Exists</strong><br>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+ </td>
+ <td>
+ <p>Returns true if the sub-query returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p>
+{% highlight sql %}
+SELECT user, amount
+FROM Orders
+WHERE product EXISTS (
+ SELECT product FROM NewProducts
+)
+{% endhighlight %}
+ <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
</tbody>
@@ -1083,6 +1103,7 @@ EXISTS (sub-query)
</td>
<td>
<p>Returns TRUE if <i>sub-query</i> returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p>
+ <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
@@ -1093,7 +1114,8 @@ value IN (sub-query)
{% endhighlight %}
</td>
<td>
- <p>Returns TRUE if <i>value</i> is equal to a row returned by sub-query. This operation is not supported in a streaming environment yet.</p>
+ <p>Returns TRUE if <i>value</i> is equal to a row returned by sub-query.</p>
+ <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
@@ -1104,7 +1126,8 @@ value NOT IN (sub-query)
{% endhighlight %}
</td>
<td>
- <p>Returns TRUE if <i>value</i> is not equal to every row returned by sub-query. This operation is not supported in a streaming environment yet.</p>
+ <p>Returns TRUE if <i>value</i> is not equal to every row returned by sub-query.</p>
+ <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 794d3b8..a7b9522 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -817,7 +817,7 @@ Table result = left.minusAll(right);
<tr>
<td>
<strong>In</strong><br>
- <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
<p>Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>
@@ -832,6 +832,8 @@ Table result = left.select("a, b, c").where("a.in(" + right + ")");
tableEnv.registerTable("RightTable", right);
Table result = left.select("a, b, c").where("a.in(RightTable)");
{% endhighlight %}
+
+ <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
</tbody>
@@ -942,7 +944,7 @@ val result = left.minusAll(right)
<tr>
<td>
<strong>In</strong><br>
- <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
<p>Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>
@@ -951,6 +953,7 @@ val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
val right = ds2.toTable(tableEnv, 'a)
val result = left.select('a, 'b, 'c).where('a.in(right))
{% endhighlight %}
+ <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
@@ -1793,14 +1796,15 @@ ANY.in(TABLE)
{% endhighlight %}
</td>
<td>
- <p>Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression. Note: This operation is not supported in a streaming environment yet.</p>
+ <p>Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>
+ <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
<tr>
<td>
{% highlight java %}
- ANY.between(lowerBound, upperBound)
+ANY.between(lowerBound, upperBound)
{% endhighlight %}
</td>
<td>
@@ -1812,7 +1816,7 @@ ANY.in(TABLE)
<tr>
<td>
{% highlight java %}
- ANY.notBetween(lowerBound, upperBound)
+ANY.notBetween(lowerBound, upperBound)
{% endhighlight %}
</td>
<td>
@@ -3358,13 +3362,14 @@ ANY.in(TABLE)
</td>
<td>
<p>Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression. Note: This operation is not supported in a streaming environment yet.</p>
+ <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
<tr>
<td>
{% highlight scala %}
- ANY.between(lowerBound, upperBound)
+ANY.between(lowerBound, upperBound)
{% endhighlight %}
</td>
<td>
@@ -3376,7 +3381,7 @@ ANY.in(TABLE)
<tr>
<td>
{% highlight scala %}
- ANY.notBetween(lowerBound, upperBound)
+ANY.notBetween(lowerBound, upperBound)
{% endhighlight %}
</td>
<td>
http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala
index 6aef5c6..9352365 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala
@@ -54,10 +54,6 @@ case class In(expression: Expression, elements: Seq[Expression]) extends Express
if (elements.length != 1) {
return ValidationFailure("IN operator supports only one table reference.")
}
- if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) {
- return ValidationFailure(
- "Sub-query IN operator on stream tables is currently not supported.")
- }
val tableOutput = table.logicalPlan.output
if (tableOutput.length > 1) {
return ValidationFailure(
http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
new file mode 100644
index 0000000..266e3ff
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class SetOperatorsTest extends TableTestBase {
+
+ @Test
+ def testInUncorrelated(): Unit = {
+ val streamUtil = streamTestUtil()
+ streamUtil.addTable[(Int, Long, String)]("tableA", 'a, 'b, 'c)
+ streamUtil.addTable[(Int, String)]("tableB", 'x, 'y)
+
+ val sqlQuery =
+ s"""
+ |SELECT * FROM tableA
+ |WHERE a IN (SELECT x FROM tableB)
+ """.stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamJoin",
+ streamTableNode(0),
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "x")
+ ),
+ term("groupBy", "x"),
+ term("select", "x")
+ ),
+ term("where", "=(a, x)"),
+ term("join", "a", "b", "c", "x"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b", "c")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testInUncorrelatedWithConditionAndAgg(): Unit = {
+ val streamUtil = streamTestUtil()
+ streamUtil.addTable[(Int, Long, String)]("tableA", 'a, 'b, 'c)
+ streamUtil.addTable[(Int, String)]("tableB", 'x, 'y)
+
+ val sqlQuery =
+ s"""
+ |SELECT * FROM tableA
+ |WHERE a IN (SELECT SUM(x) FROM tableB GROUP BY y HAVING y LIKE '%Hanoi%')
+ """.stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamJoin",
+ streamTableNode(0),
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "x", "y"),
+ term("where", "LIKE(y, '%Hanoi%')")
+ ),
+ term("groupBy", "y"),
+ term("select", "y, SUM(x) AS EXPR$0")
+ ),
+ term("select", "EXPR$0")
+ ),
+ term("groupBy", "EXPR$0"),
+ term("select", "EXPR$0")
+ ),
+ term("where", "=(a, EXPR$0)"),
+ term("join", "a", "b", "c", "EXPR$0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b", "c")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testInWithMultiUncorrelatedCondition(): Unit = {
+ val streamUtil = streamTestUtil()
+ streamUtil.addTable[(Int, Long, String)]("tableA", 'a, 'b, 'c)
+ streamUtil.addTable[(Int, String)]("tableB", 'x, 'y)
+ streamUtil.addTable[(Long, Int)]("tableC", 'w, 'z)
+
+ val sqlQuery =
+ s"""
+ |SELECT * FROM tableA
+ |WHERE a IN (SELECT x FROM tableB)
+ |AND b IN (SELECT w FROM tableC)
+ """.stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamJoin",
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamJoin",
+ streamTableNode(0),
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "x")
+ ),
+ term("groupBy", "x"),
+ term("select", "x")
+ ),
+ term("where", "=(a, x)"),
+ term("join", "a", "b", "c", "x"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b", "c")
+ ),
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(2),
+ term("select", "w")
+ ),
+ term("groupBy", "w"),
+ term("select", "w")
+ ),
+ term("where", "=(b, w)"),
+ term("join", "a", "b", "c", "w"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b", "c")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
index c0fc05b..e84c630 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
@@ -94,4 +94,138 @@ class SetOperatorsTest extends TableTestBase {
util.verifyTable(result, expected)
}
+ @Test
+ def testInUncorrelated(): Unit = {
+ val streamUtil = streamTestUtil()
+ val tableA = streamUtil.addTable[(Int, Long, String)]('a, 'b, 'c)
+ val tableB = streamUtil.addTable[(Int, String)]('x, 'y)
+
+ val result = tableA.where('a.in(tableB.select('x)))
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamJoin",
+ streamTableNode(0),
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "x")
+ ),
+ term("groupBy", "x"),
+ term("select", "x")
+ ),
+ term("where", "=(a, x)"),
+ term("join", "a", "b", "c", "x"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b", "c")
+ )
+
+ streamUtil.verifyTable(result, expected)
+ }
+
+ @Test
+ def testInUncorrelatedWithConditionAndAgg(): Unit = {
+ val streamUtil = streamTestUtil()
+ val tableA = streamUtil.addTable[(Int, Long, String)]("tableA", 'a, 'b, 'c)
+ val tableB = streamUtil.addTable[(Int, String)]("tableB", 'x, 'y)
+
+ val result = tableA
+ .where('a.in(tableB.where('y.like("%Hanoi%")).groupBy('y).select('x.sum)))
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamJoin",
+ streamTableNode(0),
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "x", "y"),
+ term("where", "LIKE(y, '%Hanoi%')")
+ ),
+ term("groupBy", "y"),
+ term("select", "y, SUM(x) AS TMP_0")
+ ),
+ term("select", "TMP_0")
+ ),
+ term("groupBy", "TMP_0"),
+ term("select", "TMP_0")
+ ),
+ term("where", "=(a, TMP_0)"),
+ term("join", "a", "b", "c", "TMP_0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b", "c")
+ )
+
+ streamUtil.verifyTable(result, expected)
+ }
+
+ @Test
+ def testInWithMultiUncorrelatedCondition(): Unit = {
+ val streamUtil = streamTestUtil()
+ val tableA = streamUtil.addTable[(Int, Long, String)]("tableA", 'a, 'b, 'c)
+ val tableB = streamUtil.addTable[(Int, String)]("tableB", 'x, 'y)
+ val tableC = streamUtil.addTable[(Long, Int)]("tableC", 'w, 'z)
+
+ val result = tableA
+ .where('a.in(tableB.select('x)) && 'b.in(tableC.select('w)))
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamJoin",
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamJoin",
+ streamTableNode(0),
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "x")
+ ),
+ term("groupBy", "x"),
+ term("select", "x")
+ ),
+ term("where", "=(a, x)"),
+ term("join", "a", "b", "c", "x"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b", "c")
+ ),
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(2),
+ term("select", "w")
+ ),
+ term("groupBy", "w"),
+ term("select", "w")
+ ),
+ term("where", "=(b, w)"),
+ term("join", "a", "b", "c", "w"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b", "c")
+ )
+
+ streamUtil.verifyTable(result, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
index 1de2b1e..91bc780 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -104,11 +104,4 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
t1.fetch(5)
}
- @Test(expected = classOf[ValidationException])
- def testIn(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.select(0.in(t1.select('_1)))
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SetOperatorsITCase.scala
new file mode 100644
index 0000000..718fd3a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SetOperatorsITCase.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class SetOperatorsITCase extends StreamingWithStateTestBase {
+
+ @Test
+ def testInUncorrelatedWithConditionAndAgg(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery =
+ s"""
+ |SELECT * FROM tableA
+ |WHERE a IN (SELECT SUM(x) FROM tableB GROUP BY y HAVING y LIKE '%Hanoi%')
+ """.stripMargin
+
+ val dataA = Seq(
+ (1, 1L, "Hello"),
+ (2, 2L, "Hello"),
+ (3, 3L, "Hello World"),
+ (4, 4L, "Hello")
+ )
+
+ val dataB = Seq(
+ (1, "hello"),
+ (1, "Hanoi"),
+ (1, "Hanoi"),
+ (2, "Hanoi-1"),
+ (2, "Hanoi-1"),
+ (-1, "Hanoi-1")
+ )
+
+ tEnv.registerTable("tableA",
+ env.fromCollection(dataA).toTable(tEnv).as('a, 'b, 'c))
+
+ tEnv.registerTable("tableB",
+ env.fromCollection(dataB).toTable(tEnv).as('x, 'y))
+
+ val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+
+ val expected = Seq(
+ "2,2,Hello", "3,3,Hello World"
+ )
+
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testInWithMultiUncorrelatedCondition(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery =
+ s"""
+ |SELECT * FROM tableA
+ |WHERE a IN (SELECT x FROM tableB)
+ |AND b IN (SELECT w FROM tableC)
+ """.stripMargin
+
+ val dataA = Seq(
+ (1, 1L, "Hello"),
+ (2, 2L, "Hello"),
+ (3, 3L, "Hello World"),
+ (4, 4L, "Hello")
+ )
+
+ val dataB = Seq(
+ (1, "hello"),
+ (2, "co-hello"),
+ (4, "hello")
+ )
+
+ val dataC = Seq(
+ (1L, "Joker"),
+ (1L, "Sanity"),
+ (2L, "Cool")
+ )
+
+ tEnv.registerTable("tableA",
+ env.fromCollection(dataA).toTable(tEnv).as('a, 'b, 'c))
+
+ tEnv.registerTable("tableB",
+ env.fromCollection(dataB).toTable(tEnv).as('x, 'y))
+
+ tEnv.registerTable("tableC",
+ env.fromCollection(dataC).toTable(tEnv).as('w, 'z))
+
+ val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+
+ val expected = Seq(
+ "1,1,Hello", "2,2,Hello"
+ )
+
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testNotInUncorrelated(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery =
+ s"""
+ |SELECT * FROM tableA
+ |WHERE a NOT IN (SELECT x FROM tableB)
+ """.stripMargin
+
+ val dataA = Seq(
+ (1, 1L, "Hello"),
+ (2, 2L, "Hello"),
+ (3, 3L, "Hello World"),
+ (4, 4L, "Hello")
+ )
+
+ val dataB = Seq(
+ (1, "hello"),
+ (2, "co-hello"),
+ (4, "hello")
+ )
+
+ tEnv.registerTable("tableA",
+ env.fromCollection(dataA).toTable(tEnv).as('a, 'b, 'c))
+
+ tEnv.registerTable("tableB",
+ env.fromCollection(dataB).toTable(tEnv).as('x, 'y))
+
+ val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+
+ val expected = Seq(
+ "3,3,Hello World"
+ )
+
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c47cc87e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 479bce2..f01c191 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -107,4 +107,125 @@ class SetOperatorsITCase extends AbstractTestBase {
val expected = mutable.MutableList("1,(1,a)", "2,(2,b)", "3,(3,c)", "4,(4,d)")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testInUncorrelated(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val dataA = Seq(
+ (1, 1L, "Hello"),
+ (2, 2L, "Hello"),
+ (3, 3L, "Hello World"),
+ (4, 4L, "Hello")
+ )
+
+ val dataB = Seq(
+ (1, "hello"),
+ (2, "co-hello"),
+ (4, "hello")
+ )
+
+ val tableA = env.fromCollection(dataA).toTable(tEnv, 'a, 'b, 'c)
+
+ val tableB = env.fromCollection(dataB).toTable(tEnv, 'x, 'y)
+
+ val result = tableA.where('a.in(tableB.select('x)))
+
+ val results = result.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+
+ val expected = Seq(
+ "1,1,Hello", "2,2,Hello", "4,4,Hello"
+ )
+
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testInUncorrelatedWithConditionAndAgg(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val dataA = Seq(
+ (1, 1L, "Hello"),
+ (2, 2L, "Hello"),
+ (3, 3L, "Hello World"),
+ (4, 4L, "Hello")
+ )
+
+ val dataB = Seq(
+ (1, "hello"),
+ (1, "Hanoi"),
+ (1, "Hanoi"),
+ (2, "Hanoi-1"),
+ (2, "Hanoi-1"),
+ (-1, "Hanoi-1")
+ )
+
+ val tableA = env.fromCollection(dataA).toTable(tEnv,'a, 'b, 'c)
+
+ val tableB = env.fromCollection(dataB).toTable(tEnv,'x, 'y)
+
+ val result = tableA
+ .where('a.in(tableB.where('y.like("%Hanoi%")).groupBy('y).select('x.sum)))
+
+ val results = result.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+
+ val expected = Seq(
+ "2,2,Hello", "3,3,Hello World"
+ )
+
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testInWithMultiUncorrelatedCondition(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val dataA = Seq(
+ (1, 1L, "Hello"),
+ (2, 2L, "Hello"),
+ (3, 3L, "Hello World"),
+ (4, 4L, "Hello")
+ )
+
+ val dataB = Seq(
+ (1, "hello"),
+ (2, "co-hello"),
+ (4, "hello")
+ )
+
+ val dataC = Seq(
+ (1L, "Joker"),
+ (1L, "Sanity"),
+ (2L, "Cool")
+ )
+
+ val tableA = env.fromCollection(dataA).toTable(tEnv,'a, 'b, 'c)
+
+ val tableB = env.fromCollection(dataB).toTable(tEnv,'x, 'y)
+
+ val tableC = env.fromCollection(dataC).toTable(tEnv,'w, 'z)
+
+ val result = tableA
+ .where('a.in(tableB.select('x)) && 'b.in(tableC.select('w)))
+
+ val results = result.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+
+ val expected = Seq(
+ "1,1,Hello", "2,2,Hello"
+ )
+
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
}