You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/07/02 13:04:28 UTC
flink git commit: [FLINK-8650] [table] Add documentation and tests
for WINDOW clause.
Repository: flink
Updated Branches:
refs/heads/master 9d8b3cd3e -> 5755a1345
[FLINK-8650] [table] Add documentation and tests for WINDOW clause.
This closes #6226.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5755a134
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5755a134
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5755a134
Branch: refs/heads/master
Commit: 5755a13455479f0a37d98aa0b65c072d78a71d50
Parents: 9d8b3cd
Author: snuyanzin <sn...@gmail.com>
Authored: Thu Jun 28 19:19:25 2018 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jul 2 14:47:44 2018 +0200
----------------------------------------------------------------------
docs/dev/table/sql.md | 31 ++++-
.../table/api/stream/sql/OverWindowTest.scala | 120 +++++++++++++++++++
.../flink/table/utils/TableTestBase.scala | 13 ++
3 files changed, 160 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5755a134/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index da78a40..7279f40 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -115,6 +115,10 @@ The following BNF-grammar describes the superset of supported SQL features in ba
{% highlight sql %}
+insert:
+ INSERT INTO tableReference
+ query
+
query:
values
| {
@@ -139,7 +143,8 @@ select:
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
-
+ [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
+
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
@@ -176,9 +181,20 @@ groupItem:
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
-insert:
- INSERT INTO tableReference
- query
+windowRef:
+ windowName
+ | windowSpec
+
+windowSpec:
+ [ windowName ]
+ '('
+ [ ORDER BY orderItem [, orderItem ]* ]
+ [ PARTITION BY expression [, expression ]* ]
+ [
+ RANGE numericOrIntervalExpression {PRECEDING}
+ | ROWS numericExpression {PRECEDING}
+ ]
+ ')'
{% endhighlight %}
@@ -302,6 +318,13 @@ SELECT COUNT(amount) OVER (
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
+
+SELECT COUNT(amount) OVER w, SUM(amount) OVER w
+FROM Orders
+WINDOW w AS (
+ PARTITION BY user
+ ORDER BY proctime
+ ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
{% endhighlight %}
</td>
</tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/5755a134/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
index c8257b4..1be5810 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
@@ -45,6 +45,16 @@ class OverWindowTest extends TableTestBase {
"CURRENT ROW) as sum2 " +
"from MyTable"
+ val sql2 = "SELECT " +
+ "b, " +
+ "count(a) OVER w as cnt1, " +
+ "sum(a) OVER w as sum1, " +
+ "count(DISTINCT a) OVER w as cnt2, " +
+ "sum(DISTINCT c) OVER w as sum2 " +
+ "from MyTable " +
+ "WINDOW w AS (PARTITION BY b ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW)"
+ streamUtil.verifySqlPlansIdentical(sql, sql2)
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -77,6 +87,15 @@ class OverWindowTest extends TableTestBase {
"CURRENT ROW) as sum1 " +
"from MyTable"
+ val sql2 = "SELECT " +
+ "c, " +
+ "count(DISTINCT a) OVER w as cnt1, " +
+ "sum(DISTINCT a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+ "CURRENT ROW) as sum1 " +
+ "FROM MyTable " +
+ "WINDOW w AS (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW)"
+ streamUtil.verifySqlPlansIdentical(sql, sql2)
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -108,6 +127,15 @@ class OverWindowTest extends TableTestBase {
"CURRENT ROW) as sum1 " +
"from MyTable"
+ val sql2 = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+ "CURRENT ROW) as cnt1, " +
+ "sum(a) OVER w as sum1 " +
+ "from MyTable " +
+ "WINDOW w AS (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW)"
+ streamUtil.verifySqlPlansIdentical(sql, sql2)
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -137,6 +165,14 @@ class OverWindowTest extends TableTestBase {
" RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
"FROM MyTable"
+ val sqlQuery2 =
+ "SELECT a, " +
+ " AVG(c) OVER w AS avgA " +
+ "FROM MyTable " +
+ "WINDOW w AS (PARTITION BY a ORDER BY proctime " +
+ " RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW)"
+ streamUtil.verifySqlPlansIdentical(sqlQuery, sqlQuery2)
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -174,6 +210,14 @@ class OverWindowTest extends TableTestBase {
" RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) " +
"FROM MyTable"
+ val sqlQuery2 =
+ "SELECT a, " +
+ " COUNT(c) OVER w " +
+ "FROM MyTable " +
+ "WINDOW w AS (ORDER BY proctime " +
+ " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)"
+ streamUtil.verifySqlPlansIdentical(sqlQuery, sqlQuery2)
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -202,6 +246,13 @@ class OverWindowTest extends TableTestBase {
"CURRENT ROW)" +
"from MyTable"
+ val sql2 = "SELECT " +
+ "c, " +
+ "count(a) OVER w " +
+ "FROM MyTable " +
+ "WINDOW w AS (ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW)"
+ streamUtil.verifySqlPlansIdentical(sql, sql2)
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -230,6 +281,14 @@ class OverWindowTest extends TableTestBase {
"sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"from MyTable"
+ val sql2 = "SELECT " +
+ "c, " +
+ "count(a) OVER w as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+ "FROM MyTable " +
+ "WINDOW w AS (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding)"
+ streamUtil.verifySqlPlansIdentical(sql, sql2)
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -258,6 +317,13 @@ class OverWindowTest extends TableTestBase {
"CURRENT ROW) " +
"from MyTable"
+ val sql2 = "SELECT " +
+ "c, " +
+ "count(a) OVER w " +
+ "FROM MyTable " +
+ "WINDOW w AS (PARTITION BY c ORDER BY proctime ROWS UNBOUNDED preceding)"
+ streamUtil.verifySqlPlansIdentical(sql, sql2)
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -282,6 +348,14 @@ class OverWindowTest extends TableTestBase {
"sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"from MyTable"
+ val sql2 = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER w as cnt2 " +
+ "FROM MyTable " +
+ "WINDOW w AS(ORDER BY proctime RANGE UNBOUNDED preceding)"
+ streamUtil.verifySqlPlansIdentical(sql, sql2)
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -309,6 +383,13 @@ class OverWindowTest extends TableTestBase {
"CURRENT ROW) " +
"from MyTable"
+ val sql2 = "SELECT " +
+ "c, " +
+ "count(a) OVER w " +
+ "FROM MyTable " +
+ "WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)"
+ streamUtil.verifySqlPlansIdentical(sql, sql2)
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -569,4 +650,43 @@ class OverWindowTest extends TableTestBase {
)
streamUtil.verifySql(sql, expected)
}
+
+ @Test
+ def testProcTimeBoundedPartitionedRowsOverDifferentWindows() = {
+ val sql = "SELECT " +
+ "a, " +
+ "SUM(c) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " +
+ "MIN(c) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
+ "FROM MyTable"
+
+ val sql2 = "SELECT " +
+ "a, " +
+ "SUM(c) OVER w1, " +
+ "MIN(c) OVER w2 " +
+ "FROM MyTable " +
+ "WINDOW w1 AS (PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)," +
+ "w2 AS (PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)"
+ streamUtil.verifySqlPlansIdentical(sql, sql2)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "proctime")
+ ),
+ term("partitionBy", "a"),
+ term("orderBy", "proctime"),
+ term("rows", "BETWEEN 3 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0",
+ "$SUM0(c) AS w0$o1")
+ ),
+ term("select", "a", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS EXPR$1", "w1$o0 AS EXPR$2")
+ )
+
+ streamUtil.verifySql(sql, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5755a134/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 30b67e7..f414dd4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -280,12 +280,25 @@ case class StreamTableTestUtil() extends TableTestUtil {
verifyTable(tableEnv.sqlQuery(query), expected)
}
+ def verifySqlPlansIdentical(query1: String, queries: String*): Unit = {
+ val resultTable1 = tableEnv.sqlQuery(query1)
+ queries.foreach(s => verify2Tables(resultTable1, tableEnv.sqlQuery(s)))
+ }
+
def verifyTable(resultTable: Table, expected: String): Unit = {
val relNode = resultTable.getRelNode
val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false)
verifyString(expected, optimized)
}
+ def verify2Tables(resultTable1: Table, resultTable2: Table): Unit = {
+ val relNode1 = resultTable1.getRelNode
+ val optimized1 = tableEnv.optimize(relNode1, updatesAsRetraction = false)
+ val relNode2 = resultTable2.getRelNode
+ val optimized2 = tableEnv.optimize(relNode2, updatesAsRetraction = false)
+ assertEquals(RelOptUtil.toString(optimized1), RelOptUtil.toString(optimized2))
+ }
+
def verifyJavaSql(query: String, expected: String): Unit = {
verifyJavaTable(javaTableEnv.sqlQuery(query), expected)
}