You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/07/02 13:04:28 UTC

flink git commit: [FLINK-8650] [table] Add documentation and tests for WINDOW clause.

Repository: flink
Updated Branches:
  refs/heads/master 9d8b3cd3e -> 5755a1345


[FLINK-8650] [table] Add documentation and tests for WINDOW clause.

This closes #6226.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5755a134
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5755a134
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5755a134

Branch: refs/heads/master
Commit: 5755a13455479f0a37d98aa0b65c072d78a71d50
Parents: 9d8b3cd
Author: snuyanzin <sn...@gmail.com>
Authored: Thu Jun 28 19:19:25 2018 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jul 2 14:47:44 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  31 ++++-
 .../table/api/stream/sql/OverWindowTest.scala   | 120 +++++++++++++++++++
 .../flink/table/utils/TableTestBase.scala       |  13 ++
 3 files changed, 160 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5755a134/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index da78a40..7279f40 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -115,6 +115,10 @@ The following BNF-grammar describes the superset of supported SQL features in ba
 
 {% highlight sql %}
 
+insert:
+  INSERT INTO tableReference
+  query
+  
 query:
   values
   | {
@@ -139,7 +143,8 @@ select:
   [ WHERE booleanExpression ]
   [ GROUP BY { groupItem [, groupItem ]* } ]
   [ HAVING booleanExpression ]
-
+  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
+  
 selectWithoutFrom:
   SELECT [ ALL | DISTINCT ]
   { * | projectItem [, projectItem ]* }
@@ -176,9 +181,20 @@ groupItem:
   | ROLLUP '(' expression [, expression ]* ')'
   | GROUPING SETS '(' groupItem [, groupItem ]* ')'
 
-insert:
-  INSERT INTO tableReference
-  query
+windowRef:
+    windowName
+  | windowSpec
+
+windowSpec:
+    [ windowName ]
+    '('
+    [ ORDER BY orderItem [, orderItem ]* ]
+    [ PARTITION BY expression [, expression ]* ]
+    [
+        RANGE numericOrIntervalExpression {PRECEDING}
+      | ROWS numericExpression {PRECEDING}
+    ]
+    ')'
 
 {% endhighlight %}
 
@@ -302,6 +318,13 @@ SELECT COUNT(amount) OVER (
   ORDER BY proctime
   ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
 FROM Orders
+
+SELECT COUNT(amount) OVER w, SUM(amount) OVER w
+FROM Orders 
+WINDOW w AS (
+  PARTITION BY user
+  ORDER BY proctime
+  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)  
 {% endhighlight %}
       </td>
     </tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/5755a134/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
index c8257b4..1be5810 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
@@ -45,6 +45,16 @@ class OverWindowTest extends TableTestBase {
       "CURRENT ROW) as sum2 " +
       "from MyTable"
 
+    val sql2 = "SELECT " +
+      "b, " +
+      "count(a) OVER w as cnt1, " +
+      "sum(a) OVER w as sum1, " +
+      "count(DISTINCT a) OVER w as cnt2, " +
+      "sum(DISTINCT c) OVER w as sum2 " +
+      "from MyTable " +
+      "WINDOW w AS (PARTITION BY b ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW)"
+    streamUtil.verifySqlPlansIdentical(sql, sql2)
+    
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -77,6 +87,15 @@ class OverWindowTest extends TableTestBase {
       "CURRENT ROW) as sum1 " +
       "from MyTable"
 
+    val sql2 = "SELECT " +
+      "c, " +
+      "count(DISTINCT a) OVER w as cnt1, " +
+      "sum(DISTINCT a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+      "CURRENT ROW) as sum1 " +
+      "FROM MyTable " +
+      "WINDOW w AS (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW)"
+    streamUtil.verifySqlPlansIdentical(sql, sql2)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -108,6 +127,15 @@ class OverWindowTest extends TableTestBase {
       "CURRENT ROW) as sum1 " +
       "from MyTable"
 
+    val sql2 = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+      "CURRENT ROW) as cnt1, " +
+      "sum(a) OVER w as sum1 " +
+      "from MyTable " +
+      "WINDOW w AS (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW)"
+    streamUtil.verifySqlPlansIdentical(sql, sql2)
+    
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -137,6 +165,14 @@ class OverWindowTest extends TableTestBase {
         "    RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
         "FROM MyTable"
 
+    val sqlQuery2 =
+      "SELECT a, " +
+        "  AVG(c) OVER w AS avgA " +
+        "FROM MyTable " +
+        "WINDOW w AS (PARTITION BY a ORDER BY proctime " +
+        "  RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW)"
+    streamUtil.verifySqlPlansIdentical(sqlQuery, sqlQuery2)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -174,6 +210,14 @@ class OverWindowTest extends TableTestBase {
         "    RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) " +
         "FROM MyTable"
 
+    val sqlQuery2 =
+      "SELECT a, " +
+        "  COUNT(c) OVER w " +
+        "FROM MyTable " +
+        "WINDOW w AS (ORDER BY proctime " +
+        "  RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)"
+    streamUtil.verifySqlPlansIdentical(sqlQuery, sqlQuery2)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -202,6 +246,13 @@ class OverWindowTest extends TableTestBase {
       "CURRENT ROW)" +
       "from MyTable"
 
+    val sql2 = "SELECT " +
+      "c, " +
+      "count(a) OVER w " +
+      "FROM MyTable " +
+      "WINDOW w AS (ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW)"
+    streamUtil.verifySqlPlansIdentical(sql, sql2)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -230,6 +281,14 @@ class OverWindowTest extends TableTestBase {
       "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from MyTable"
 
+    val sql2 = "SELECT " +
+      "c, " +
+      "count(a) OVER w as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "FROM MyTable " +
+      "WINDOW w AS (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding)"
+    streamUtil.verifySqlPlansIdentical(sql, sql2)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -258,6 +317,13 @@ class OverWindowTest extends TableTestBase {
       "CURRENT ROW) " +
       "from MyTable"
 
+    val sql2 = "SELECT " +
+      "c, " +
+      "count(a) OVER w " +
+      "FROM MyTable " +
+      "WINDOW w AS (PARTITION BY c ORDER BY proctime ROWS UNBOUNDED preceding)"
+    streamUtil.verifySqlPlansIdentical(sql, sql2)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -282,6 +348,14 @@ class OverWindowTest extends TableTestBase {
       "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from MyTable"
 
+    val sql2 = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER w as cnt2 " +
+      "FROM MyTable " +
+      "WINDOW w AS(ORDER BY proctime RANGE UNBOUNDED preceding)"
+    streamUtil.verifySqlPlansIdentical(sql, sql2)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -309,6 +383,13 @@ class OverWindowTest extends TableTestBase {
       "CURRENT ROW) " +
       "from MyTable"
 
+    val sql2 = "SELECT " +
+      "c, " +
+      "count(a) OVER w " +
+      "FROM MyTable " +
+      "WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)"
+    streamUtil.verifySqlPlansIdentical(sql, sql2)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -569,4 +650,43 @@ class OverWindowTest extends TableTestBase {
       )
     streamUtil.verifySql(sql, expected)
   }
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOverDifferentWindows() = {
+    val sql = "SELECT " +
+      "a, " +
+      "SUM(c) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " +
+      "MIN(c) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
+      "FROM MyTable"
+
+    val sql2 = "SELECT " +
+      "a, " +
+      "SUM(c) OVER w1, " +
+      "MIN(c) OVER w2 " +
+      "FROM MyTable " +
+      "WINDOW w1 AS (PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)," +
+      "w2 AS (PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)"
+    streamUtil.verifySqlPlansIdentical(sql, sql2)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("partitionBy", "a"),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN 3 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0",
+            "$SUM0(c) AS w0$o1")
+        ),
+        term("select", "a", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS EXPR$1", "w1$o0 AS EXPR$2")
+      )
+
+    streamUtil.verifySql(sql, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5755a134/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 30b67e7..f414dd4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -280,12 +280,25 @@ case class StreamTableTestUtil() extends TableTestUtil {
     verifyTable(tableEnv.sqlQuery(query), expected)
   }
 
+  def verifySqlPlansIdentical(query1: String, queries: String*): Unit = {
+    val resultTable1 = tableEnv.sqlQuery(query1)
+    queries.foreach(s => verify2Tables(resultTable1, tableEnv.sqlQuery(s)))
+  }
+
   def verifyTable(resultTable: Table, expected: String): Unit = {
     val relNode = resultTable.getRelNode
     val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false)
     verifyString(expected, optimized)
   }
 
+  def verify2Tables(resultTable1: Table, resultTable2: Table): Unit = {
+    val relNode1 = resultTable1.getRelNode
+    val optimized1 = tableEnv.optimize(relNode1, updatesAsRetraction = false)
+    val relNode2 = resultTable2.getRelNode
+    val optimized2 = tableEnv.optimize(relNode2, updatesAsRetraction = false)
+    assertEquals(RelOptUtil.toString(optimized1), RelOptUtil.toString(optimized2))
+  }
+
   def verifyJavaSql(query: String, expected: String): Unit = {
     verifyJavaTable(javaTableEnv.sqlQuery(query), expected)
   }