You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/11/23 20:03:12 UTC
[5/5] flink git commit: [FLINK-5143] [table] Add EXISTS to list of
supported SQL operators.
[FLINK-5143] [table] Add EXISTS to list of supported SQL operators.
This closes #2853.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06d252e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06d252e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06d252e8
Branch: refs/heads/master
Commit: 06d252e89b58c5947b331cf24a552d11ff8767e8
Parents: 7d91b9e
Author: twalthr <tw...@apache.org>
Authored: Wed Nov 23 10:44:20 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 23 18:35:44 2016 +0100
----------------------------------------------------------------------
docs/dev/table_api.md | 18 ++---
.../table/plan/nodes/dataset/DataSetJoin.scala | 2 +-
.../api/table/validate/FunctionCatalog.scala | 3 +-
.../api/scala/batch/sql/SetOperatorsTest.scala | 75 ++++++++++++++++++++
.../flink/api/table/utils/TableTestBase.scala | 26 +++++++
5 files changed, 114 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index bb0e500..4d8c953 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -2800,39 +2800,41 @@ value NOT IN (value [, value]* )
<p>Whether <i>value</i> is not equal to every value in a list.</p>
</td>
</tr>
-<!-- NOT SUPPORTED SO FAR
+
<tr>
<td>
{% highlight text %}
-value IN (sub-query)
+EXISTS (sub-query)
{% endhighlight %}
</td>
<td>
- <p>Whether <i>value</i> is equal to a row returned by sub-query.</p>
+ <p>Whether <i>sub-query</i> returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p>
</td>
</tr>
+<!-- NOT SUPPORTED SO FAR
<tr>
<td>
{% highlight text %}
-value NOT IN (sub-query)
+value IN (sub-query)
{% endhighlight %}
</td>
<td>
- <p>Whether <i>value</i> is not equal to every row returned by sub-query.</p>
+ <p>Whether <i>value</i> is equal to a row returned by sub-query.</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
-EXISTS (sub-query)
+value NOT IN (sub-query)
{% endhighlight %}
</td>
<td>
- <p>Whether sub-query returns at least one row.</p>
+ <p>Whether <i>value</i> is not equal to every row returned by sub-query.</p>
</td>
- </tr>-->
+ </tr>
+ -->
</tbody>
</table>
http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index bbb6325..6d7a30e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -215,7 +215,7 @@ class DataSetJoin(
}
private def joinTypeToString = joinType match {
- case JoinRelType.INNER => "Join"
+ case JoinRelType.INNER => "InnerJoin"
case JoinRelType.LEFT=> "LeftOuterJoin"
case JoinRelType.RIGHT => "RightOuterJoin"
case JoinRelType.FULL => "FullOuterJoin"
http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index 68e2f97..679733c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -281,7 +281,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.CAST,
SqlStdOperatorTable.EXTRACT,
SqlStdOperatorTable.QUARTER,
- SqlStdOperatorTable.SCALAR_QUERY
+ SqlStdOperatorTable.SCALAR_QUERY,
+ SqlStdOperatorTable.EXISTS
)
builtInSqlOperators.foreach(register)
http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
new file mode 100644
index 0000000..5bc6e4a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+class SetOperatorsTest extends TableTestBase {
+
+ @Test
+ def testExists(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string)
+ util.addTable[(Long, Int, String)]("B", 'b_long, 'b_int, 'b_string)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ batchTableNode(1),
+ unaryNode(
+ "DataSetAggregate",
+ batchTableNode(0),
+ term("groupBy", "a_long"),
+ term("select", "a_long")
+ ),
+ term("where", "=(a_long, b_long)"),
+ term("join", "b_long", "b_int", "b_string", "a_long"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a_long", "true AS $f0")
+ ),
+ term("groupBy", "a_long"),
+ term("select", "a_long", "MIN($f0) AS $f1")
+ ),
+ term("where", "=(a_long, a_long0)"),
+ term("join", "a_long", "a_int", "a_string", "a_long0", "$f1"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a_int", "a_string")
+ )
+
+ util.verifySql(
+ "SELECT a_int, a_string FROM A WHERE EXISTS(SELECT * FROM B WHERE a_long = b_long)",
+ expected
+ )
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
index ce693ff..2ea15a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
@@ -56,6 +56,10 @@ abstract class TableTestUtil {
def addTable[T: TypeInformation](name: String, fields: Expression*): Table
def verifySql(query: String, expected: String): Unit
def verifyTable(resultTable: Table, expected: String): Unit
+
+ // the print methods are for debugging purposes only
+ def printTable(resultTable: Table): Unit
+ def printSql(query: String): Unit
}
object TableTestUtil {
@@ -87,6 +91,7 @@ object TableTestUtil {
def streamTableNode(idx: Int): String = {
s"DataStreamScan(table=[[_DataStreamTable_$idx]])"
}
+
}
case class BatchTableTestUtil() extends TableTestUtil {
@@ -121,6 +126,16 @@ case class BatchTableTestUtil() extends TableTestUtil {
expected.split("\n").map(_.trim).mkString("\n"),
actual.split("\n").map(_.trim).mkString("\n"))
}
+
+ def printTable(resultTable: Table): Unit = {
+ val relNode = resultTable.getRelNode
+ val optimized = tEnv.optimize(relNode)
+ println(RelOptUtil.toString(optimized))
+ }
+
+ def printSql(query: String): Unit = {
+ printTable(tEnv.sql(query))
+ }
}
case class StreamTableTestUtil() extends TableTestUtil {
@@ -156,4 +171,15 @@ case class StreamTableTestUtil() extends TableTestUtil {
expected.split("\n").map(_.trim).mkString("\n"),
actual.split("\n").map(_.trim).mkString("\n"))
}
+
+ // the print methods are for debugging purposes only
+ def printTable(resultTable: Table): Unit = {
+ val relNode = resultTable.getRelNode
+ val optimized = tEnv.optimize(relNode)
+ println(RelOptUtil.toString(optimized))
+ }
+
+ def printSql(query: String): Unit = {
+ printTable(tEnv.sql(query))
+ }
}