You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/29 13:42:20 UTC
flink git commit: [FLINK-6760] [table] Fix OverWindowTest alias test
error
Repository: flink
Updated Branches:
refs/heads/master 89f0ad90b -> 6b69c588d
[FLINK-6760] [table] Fix OverWindowTest alias test error
This closes #4007.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b69c588
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b69c588
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b69c588
Branch: refs/heads/master
Commit: 6b69c588df866c7b1694a58a433f7957bee456c6
Parents: 89f0ad9
Author: sunjincheng121 <su...@gmail.com>
Authored: Mon May 29 19:10:58 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Mon May 29 15:41:30 2017 +0200
----------------------------------------------------------------------
.../api/scala/stream/sql/OverWindowITCase.scala | 32 +++++++++-----------
.../api/scala/stream/sql/OverWindowTest.scala | 25 +++++++--------
2 files changed, 28 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b69c588/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 7ba5c16..36eff1e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -63,8 +63,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
@@ -87,9 +87,9 @@ class OverWindowITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, " +
" SUM(c) OVER (" +
- " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC, " +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
" MIN(c) OVER (" +
- " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
"FROM MyTable"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
@@ -130,9 +130,9 @@ class OverWindowITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, " +
" SUM(c) OVER (" +
- " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
" MIN(c) OVER (" +
- " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
"FROM MyTable"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
@@ -173,8 +173,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
@@ -198,11 +198,12 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val sqlQuery = "SELECT " +
+ val sqlQuery = "SELECT c, cnt1 from " +
+ "(SELECT " +
"c, " +
"count(a) " +
- " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
- "from T1"
+ " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+ "as cnt1 from T1)"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
@@ -230,8 +231,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT " +
"c, " +
- "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+ "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
@@ -256,7 +257,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
- "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+ "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
"from T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
@@ -777,9 +778,6 @@ class OverWindowITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
-// <<<<<<< HEAD
-
-
/** test sliding event-time unbounded window with partition by **/
@Test
def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6b69c588/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
index 711b31b..a79d48f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
@@ -35,7 +35,9 @@ class OverWindowTest extends TableTestBase {
val sql = "SELECT " +
"c, " +
"count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
- "CURRENT ROW) as cnt1 " +
+ "CURRENT ROW) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+ "CURRENT ROW) as sum1 " +
"from MyTable"
val expected =
@@ -51,9 +53,9 @@ class OverWindowTest extends TableTestBase {
term("partitionBy", "c"),
term("orderBy", "proctime"),
term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
+ term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1")
),
- term("select", "c", "w0$o0 AS $1")
+ term("select", "c", "w0$o0 AS cnt1, CASE(>(w0$o0, 0), CAST(w0$o1), null) AS sum1")
)
streamUtil.verifySql(sql, expected)
}
@@ -101,8 +103,8 @@ class OverWindowTest extends TableTestBase {
val sqlQuery =
"SELECT a, " +
" COUNT(c) OVER (ORDER BY proctime " +
- " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
- "FROM MyTable"
+ " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) " +
+ "FROM MyTable"
val expected =
unaryNode(
@@ -129,7 +131,7 @@ class OverWindowTest extends TableTestBase {
val sql = "SELECT " +
"c, " +
"count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
- "CURRENT ROW) as cnt1 " +
+ "CURRENT ROW)" +
"from MyTable"
val expected =
@@ -185,7 +187,7 @@ class OverWindowTest extends TableTestBase {
val sql = "SELECT " +
"c, " +
"count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
- "CURRENT ROW) as cnt1 " +
+ "CURRENT ROW) " +
"from MyTable"
val expected =
@@ -236,7 +238,7 @@ class OverWindowTest extends TableTestBase {
val sql = "SELECT " +
"c, " +
"count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
- "CURRENT ROW) as cnt1 " +
+ "CURRENT ROW) " +
"from MyTable"
val expected =
@@ -259,7 +261,7 @@ class OverWindowTest extends TableTestBase {
val sql = "SELECT " +
"c, " +
"count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
- "CURRENT ROW) as cnt1 " +
+ "CURRENT ROW) " +
"from MyTable"
val expected =
@@ -287,7 +289,7 @@ class OverWindowTest extends TableTestBase {
val sql = "SELECT " +
"c, " +
"count(a) OVER (PARTITION BY c ORDER BY rowtime " +
- "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " +
+ "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) " +
"from MyTable"
val expected =
@@ -315,7 +317,7 @@ class OverWindowTest extends TableTestBase {
val sql = "SELECT " +
"c, " +
"count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
- "CURRENT ROW) as cnt1 " +
+ "CURRENT ROW) " +
"from MyTable"
val expected =
@@ -499,5 +501,4 @@ class OverWindowTest extends TableTestBase {
)
streamUtil.verifySql(sql, expected)
}
-
}