You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/29 13:42:20 UTC

flink git commit: [FLINK-6760] [table] Fix OverWindowTest alias test error

Repository: flink
Updated Branches:
  refs/heads/master 89f0ad90b -> 6b69c588d


[FLINK-6760] [table] Fix OverWindowTest alias test error

This closes #4007.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b69c588
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b69c588
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b69c588

Branch: refs/heads/master
Commit: 6b69c588df866c7b1694a58a433f7957bee456c6
Parents: 89f0ad9
Author: sunjincheng121 <su...@gmail.com>
Authored: Mon May 29 19:10:58 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Mon May 29 15:41:30 2017 +0200

----------------------------------------------------------------------
 .../api/scala/stream/sql/OverWindowITCase.scala | 32 +++++++++-----------
 .../api/scala/stream/sql/OverWindowTest.scala   | 25 +++++++--------
 2 files changed, 28 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b69c588/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 7ba5c16..36eff1e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -63,8 +63,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
@@ -87,9 +87,9 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, " +
       "  SUM(c) OVER (" +
-      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC, " +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
       "  MIN(c) OVER (" +
-      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
       "FROM MyTable"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
@@ -130,9 +130,9 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, " +
       "  SUM(c) OVER (" +
-      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
       "  MIN(c) OVER (" +
-      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
       "FROM MyTable"
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
@@ -173,8 +173,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT " +
       "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
@@ -198,11 +198,12 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val sqlQuery = "SELECT " +
+    val sqlQuery = "SELECT c, cnt1 from " +
+      "(SELECT " +
       "c, " +
       "count(a) " +
-      " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
-      "from T1"
+      " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+      "as cnt1 from T1)"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
@@ -230,8 +231,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT " +
       "c, " +
-      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding), " +
+      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
@@ -256,7 +257,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
@@ -777,9 +778,6 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-//  <<<<<<< HEAD
-
-
   /** test sliding event-time unbounded window with partition by **/
   @Test
   def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b69c588/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
index 711b31b..a79d48f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
@@ -35,7 +35,9 @@ class OverWindowTest extends TableTestBase {
     val sql = "SELECT " +
       "c, " +
       "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
-      "CURRENT ROW) as cnt1 " +
+      "CURRENT ROW) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+      "CURRENT ROW) as sum1 " +
       "from MyTable"
 
     val expected =
@@ -51,9 +53,9 @@ class OverWindowTest extends TableTestBase {
           term("partitionBy", "c"),
           term("orderBy", "proctime"),
           term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1")
         ),
-        term("select", "c", "w0$o0 AS $1")
+        term("select", "c", "w0$o0 AS cnt1, CASE(>(w0$o0, 0), CAST(w0$o1), null) AS sum1")
       )
     streamUtil.verifySql(sql, expected)
   }
@@ -101,8 +103,8 @@ class OverWindowTest extends TableTestBase {
     val sqlQuery =
       "SELECT a, " +
         "  COUNT(c) OVER (ORDER BY proctime " +
-        "    RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
-      "FROM MyTable"
+        "    RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) " +
+        "FROM MyTable"
 
     val expected =
       unaryNode(
@@ -129,7 +131,7 @@ class OverWindowTest extends TableTestBase {
     val sql = "SELECT " +
       "c, " +
       "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
-      "CURRENT ROW) as cnt1 " +
+      "CURRENT ROW)" +
       "from MyTable"
 
     val expected =
@@ -185,7 +187,7 @@ class OverWindowTest extends TableTestBase {
     val sql = "SELECT " +
       "c, " +
       "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
-      "CURRENT ROW) as cnt1 " +
+      "CURRENT ROW) " +
       "from MyTable"
 
     val expected =
@@ -236,7 +238,7 @@ class OverWindowTest extends TableTestBase {
     val sql = "SELECT " +
       "c, " +
       "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
-      "CURRENT ROW) as cnt1 " +
+      "CURRENT ROW) " +
       "from MyTable"
 
     val expected =
@@ -259,7 +261,7 @@ class OverWindowTest extends TableTestBase {
     val sql = "SELECT " +
       "c, " +
       "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
-      "CURRENT ROW) as cnt1 " +
+      "CURRENT ROW) " +
       "from MyTable"
 
     val expected =
@@ -287,7 +289,7 @@ class OverWindowTest extends TableTestBase {
     val sql = "SELECT " +
       "c, " +
       "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
-      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
+      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) " +
       "from MyTable"
 
     val expected =
@@ -315,7 +317,7 @@ class OverWindowTest extends TableTestBase {
     val sql = "SELECT " +
       "c, " +
       "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
-      "CURRENT ROW) as cnt1 " +
+      "CURRENT ROW) " +
       "from MyTable"
 
     val expected =
@@ -499,5 +501,4 @@ class OverWindowTest extends TableTestBase {
       )
     streamUtil.verifySql(sql, expected)
   }
-
 }