You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/11/23 20:03:12 UTC

[5/5] flink git commit: [FLINK-5143] [table] Add EXISTS to list of supported SQL operators.

[FLINK-5143] [table] Add EXISTS to list of supported SQL operators.

This closes #2853.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06d252e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06d252e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06d252e8

Branch: refs/heads/master
Commit: 06d252e89b58c5947b331cf24a552d11ff8767e8
Parents: 7d91b9e
Author: twalthr <tw...@apache.org>
Authored: Wed Nov 23 10:44:20 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 23 18:35:44 2016 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md                           | 18 ++---
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  2 +-
 .../api/table/validate/FunctionCatalog.scala    |  3 +-
 .../api/scala/batch/sql/SetOperatorsTest.scala  | 75 ++++++++++++++++++++
 .../flink/api/table/utils/TableTestBase.scala   | 26 +++++++
 5 files changed, 114 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index bb0e500..4d8c953 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -2800,39 +2800,41 @@ value NOT IN (value [, value]* )
         <p>Whether <i>value</i> is not equal to every value in a list.</p>
       </td>
     </tr>
-<!-- NOT SUPPORTED SO FAR
+
     <tr>
       <td>
         {% highlight text %}
-value IN (sub-query)
+EXISTS (sub-query)
 {% endhighlight %}
       </td>
       <td>
-        <p>Whether <i>value</i> is equal to a row returned by sub-query.</p>
+        <p>Whether <i>sub-query</i> returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p>
       </td>
     </tr>
 
+<!-- NOT SUPPORTED SO FAR
     <tr>
       <td>
         {% highlight text %}
-value NOT IN (sub-query)
+value IN (sub-query)
 {% endhighlight %}
       </td>
       <td>
-        <p>Whether <i>value</i> is not equal to every row returned by sub-query.</p>
+        <p>Whether <i>value</i> is equal to a row returned by sub-query.</p>
       </td>
     </tr>
 
     <tr>
       <td>
         {% highlight text %}
-EXISTS (sub-query)
+value NOT IN (sub-query)
 {% endhighlight %}
       </td>
       <td>
-        <p>Whether sub-query returns at least one row.</p>
+        <p>Whether <i>value</i> is not equal to every row returned by sub-query.</p>
       </td>
-    </tr>-->
+    </tr>
+    -->
 
   </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index bbb6325..6d7a30e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -215,7 +215,7 @@ class DataSetJoin(
   }
 
   private def joinTypeToString = joinType match {
-    case JoinRelType.INNER => "Join"
+    case JoinRelType.INNER => "InnerJoin"
     case JoinRelType.LEFT=> "LeftOuterJoin"
     case JoinRelType.RIGHT => "RightOuterJoin"
     case JoinRelType.FULL => "FullOuterJoin"

http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index 68e2f97..679733c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -281,7 +281,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.CAST,
     SqlStdOperatorTable.EXTRACT,
     SqlStdOperatorTable.QUARTER,
-    SqlStdOperatorTable.SCALAR_QUERY
+    SqlStdOperatorTable.SCALAR_QUERY,
+    SqlStdOperatorTable.EXISTS
   )
 
   builtInSqlOperators.foreach(register)

http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
new file mode 100644
index 0000000..5bc6e4a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+class SetOperatorsTest extends TableTestBase {
+
+  @Test
+  def testExists(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string)
+    util.addTable[(Long, Int, String)]("B", 'b_long, 'b_int, 'b_string)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        batchTableNode(0),
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetCalc",
+            binaryNode(
+              "DataSetJoin",
+              batchTableNode(1),
+              unaryNode(
+                "DataSetAggregate",
+                batchTableNode(0),
+                term("groupBy", "a_long"),
+                term("select", "a_long")
+              ),
+              term("where", "=(a_long, b_long)"),
+              term("join", "b_long", "b_int", "b_string", "a_long"),
+              term("joinType", "InnerJoin")
+            ),
+            term("select", "a_long", "true AS $f0")
+          ),
+          term("groupBy", "a_long"),
+          term("select", "a_long", "MIN($f0) AS $f1")
+        ),
+        term("where", "=(a_long, a_long0)"),
+        term("join", "a_long", "a_int", "a_string", "a_long0", "$f1"),
+        term("joinType", "InnerJoin")
+      ),
+      term("select", "a_int", "a_string")
+    )
+
+    util.verifySql(
+      "SELECT a_int, a_string FROM A WHERE EXISTS(SELECT * FROM B WHERE a_long = b_long)",
+      expected
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
index ce693ff..2ea15a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
@@ -56,6 +56,10 @@ abstract class TableTestUtil {
   def addTable[T: TypeInformation](name: String, fields: Expression*): Table
   def verifySql(query: String, expected: String): Unit
   def verifyTable(resultTable: Table, expected: String): Unit
+
+  // the print methods are for debugging purposes only
+  def printTable(resultTable: Table): Unit
+  def printSql(query: String): Unit
 }
 
 object TableTestUtil {
@@ -87,6 +91,7 @@ object TableTestUtil {
   def streamTableNode(idx: Int): String = {
     s"DataStreamScan(table=[[_DataStreamTable_$idx]])"
   }
+
 }
 
 case class BatchTableTestUtil() extends TableTestUtil {
@@ -121,6 +126,16 @@ case class BatchTableTestUtil() extends TableTestUtil {
       expected.split("\n").map(_.trim).mkString("\n"),
       actual.split("\n").map(_.trim).mkString("\n"))
   }
+
+  def printTable(resultTable: Table): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    println(RelOptUtil.toString(optimized))
+  }
+
+  def printSql(query: String): Unit = {
+    printTable(tEnv.sql(query))
+  }
 }
 
 case class StreamTableTestUtil() extends TableTestUtil {
@@ -156,4 +171,15 @@ case class StreamTableTestUtil() extends TableTestUtil {
       expected.split("\n").map(_.trim).mkString("\n"),
       actual.split("\n").map(_.trim).mkString("\n"))
   }
+
+  // the print methods are for debugging purposes only
+  def printTable(resultTable: Table): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    println(RelOptUtil.toString(optimized))
+  }
+
+  def printSql(query: String): Unit = {
+    printTable(tEnv.sql(query))
+  }
 }