You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/14 02:00:57 UTC

[flink] branch master updated: [FLINK-18440][table-planner-blink] ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9723cb2  [FLINK-18440][table-planner-blink] ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions
9723cb2 is described below

commit 9723cb2090cff1c68e878282600fcd06c4f34125
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Fri Jul 10 19:03:47 2020 +0800

    [FLINK-18440][table-planner-blink] ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions
    
    This closes #12868
---
 .../operations/SqlToOperationConverter.java        | 11 +++++--
 .../table/planner/plan/batch/sql/RankTest.xml      | 29 ++++++++++++++++++
 .../table/planner/plan/stream/sql/RankTest.xml     | 26 +++++++++++++++++
 .../table/planner/plan/batch/sql/RankTest.scala    | 34 ++++++++++++++++++++++
 .../table/planner/plan/stream/sql/RankTest.scala   | 34 ++++++++++++++++++++++
 5 files changed, 132 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index be54497..fa04539 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -659,6 +659,15 @@ public class SqlToOperationConverter {
 		final SqlNodeList fieldList = sqlCreateView.getFieldList();
 
 		SqlNode validateQuery = flinkPlanner.validate(query);
+		// Put the sql string unparse (getQuotedSqlString()) in front of
+		// the node conversion (toQueryOperation()),
+		// because before Calcite 1.22.0, during sql-to-rel conversion, the SqlWindow
+		// bounds state would be mutated as default when they are null (not specified).
+
+		// This bug is fixed in CALCITE-3877 of Calcite 1.23.0.
+		String originalQuery = getQuotedSqlString(query);
+		String expandedQuery = getQuotedSqlString(validateQuery);
+
 		PlannerQueryOperation operation = toQueryOperation(flinkPlanner, validateQuery);
 		TableSchema schema = operation.getTableSchema();
 
@@ -681,8 +690,6 @@ public class SqlToOperationConverter {
 			schema = TableSchema.builder().fields(aliasFieldNames, inputFieldTypes).build();
 		}
 
-		String originalQuery = getQuotedSqlString(query);
-		String expandedQuery = getQuotedSqlString(validateQuery);
 		String comment = sqlCreateView.getComment().map(c -> c.getNlsString().getValue()).orElse(null);
 		CatalogView catalogView = new CatalogViewImpl(originalQuery,
 				expandedQuery,
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
index b1cc4aa..743263c 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
@@ -214,4 +214,33 @@ Calc(select=[a, b, $2])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testCreateViewWithRowNumber">
+    <Resource name="sql">
+      <![CDATA[insert into sink select name, eat, cnt
+from view2 where row_num <= 3]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[name, eat, cnt])
++- LogicalProject(name=[$0], eat=[$1], cnt=[$2])
+   +- LogicalFilter(condition=[<=($3, 3)])
+      +- LogicalProject(name=[$0], eat=[$1], cnt=[$2], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+         +- LogicalAggregate(group=[{0, 1}], cnt=[SUM($2)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink], fields=[name, eat, cnt])
++- Calc(select=[name, eat, cnt], where=[<=(w0$o0, 3)])
+   +- OverAggregate(partitionBy=[name], orderBy=[cnt DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[name, eat, cnt, w0$o0])
+      +- Sort(orderBy=[name ASC, cnt DESC])
+         +- Exchange(distribution=[hash[name]])
+            +- HashAggregate(isMerge=[true], groupBy=[name, eat], select=[name, eat, Final_SUM(sum$0) AS cnt])
+               +- Exchange(distribution=[hash[name, eat]])
+                  +- LocalHashAggregate(groupBy=[name, eat], select=[name, eat, Partial_SUM(age) AS sum$0])
+                     +- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[name, eat, age])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index 208b241..7f4c655 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -817,4 +817,30 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1,
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testCreateViewWithRowNumber">
+    <Resource name="sql">
+      <![CDATA[insert into sink select name, eat, cnt
+from view2 where row_num <= 3]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[name, eat, cnt])
++- LogicalProject(name=[$0], eat=[$1], cnt=[$2])
+   +- LogicalFilter(condition=[<=($3, 3)])
+      +- LogicalProject(name=[$0], eat=[$1], cnt=[$2], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+         +- LogicalAggregate(group=[{0, 1}], cnt=[SUM($2)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink], fields=[name, eat, cnt])
++- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[name], orderBy=[cnt DESC], select=[name, eat, cnt])
+   +- Exchange(distribution=[hash[name]])
+      +- GroupAggregate(groupBy=[name, eat], select=[name, eat, SUM(age) AS cnt])
+         +- Exchange(distribution=[hash[name, eat]])
+            +- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[name, eat, age])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RankTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RankTest.scala
index 62fb3f6..01e89f5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RankTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RankTest.scala
@@ -167,4 +167,38 @@ class RankTest extends TableTestBase {
       """.stripMargin
     util.verifyPlan(sqlQuery)
   }
+
+  @Test
+  def testCreateViewWithRowNumber(): Unit = {
+    util.addTable(
+      """
+        |CREATE TABLE test_source (
+        |  name STRING,
+        |  eat STRING,
+        |  age BIGINT
+        |) WITH (
+        |  'connector' = 'values',
+        |  'bounded' = 'true'
+        |)
+      """.stripMargin)
+    util.tableEnv.executeSql("create view view1 as select name, eat ,sum(age) as cnt\n"
+      + "from test_source group by name, eat")
+    util.tableEnv.executeSql("create view view2 as\n"
+      + "select *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY cnt DESC) as row_num\n"
+      + "from view1")
+    util.addTable(
+      s"""
+         |create table sink (
+         |  name varchar,
+         |  eat varchar,
+         |  cnt bigint
+         |)
+         |with(
+         |  'connector' = 'print'
+         |)
+         |""".stripMargin
+    )
+    util.verifyPlanInsert("insert into sink select name, eat, cnt\n"
+      + "from view2 where row_num <= 3")
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index 37cbb21..064cbf3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -638,5 +638,39 @@ class RankTest extends TableTestBase {
     util.verifyPlan(sql, ExplainDetail.CHANGELOG_MODE)
   }
 
+  @Test
+  def testCreateViewWithRowNumber(): Unit = {
+    util.addTable(
+      """
+        |CREATE TABLE test_source (
+        |  name STRING,
+        |  eat STRING,
+        |  age BIGINT
+        |) WITH (
+        |  'connector' = 'values',
+        |  'bounded' = 'false'
+        |)
+      """.stripMargin)
+    util.tableEnv.executeSql("create view view1 as select name, eat ,sum(age) as cnt\n"
+      + "from test_source group by name, eat")
+    util.tableEnv.executeSql("create view view2 as\n"
+      + "select *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY cnt DESC) as row_num\n"
+      + "from view1")
+    util.addTable(
+      s"""
+         |create table sink (
+         |  name varchar,
+         |  eat varchar,
+         |  cnt bigint
+         |)
+         |with(
+         |  'connector' = 'print'
+         |)
+         |""".stripMargin
+    )
+    util.verifyPlanInsert("insert into sink select name, eat, cnt\n"
+      + "from view2 where row_num <= 3")
+  }
+
   // TODO add tests about multi-sinks and udf
 }