You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jane Chan (Jira)" <ji...@apache.org> on 2023/03/22 03:50:00 UTC

[jira] [Comment Edited] (FLINK-31165) Over Agg: The window rank function without order by error in top N query

    [ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703466#comment-17703466 ] 

Jane Chan edited comment on FLINK-31165 at 3/22/23 3:49 AM:
------------------------------------------------------------

Hi [~rohankrao] , thanks for reporting this issue.

Actually, the order by field SRC_NO is a constant and is folded during query rewrite. 
{code:java}
LogicalProject(inputs=[0..2])
+- LogicalFilter(condition=[<=($2, 1)])
   +- LogicalProject(inputs=[0..1], exprs=[[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY 2022-01-01 NULLS FIRST)]])
      +- LogicalTableScan(table=[[default_catalog, default_database, temp_table]]) {code}
From the perspective of SQL semantics, using a constant as the order by field for row_number has no meaning, because the constant will not change the sorting result of row_number, and each row will get the same rank. As a current workaround, please try to specify another field to ensure that row_number is sorted in the correct order.

While I tested the query against MySQL, PostgreSQL, Spark, and Hive, they do allow this to happen, and just output the first inserted row. Do you think we need to align this behavior? cc [~godfreyhe]  [~lincoln.86xy] 

 

 


was (Author: qingyue):
Hi [~rohankrao] , thanks for reporting this issue.

Actually, the order by field SRC_NO is a constant and is folded during query rewrite. 

 
{code:java}
LogicalProject(inputs=[0..2])
+- LogicalFilter(condition=[<=($2, 1)])
   +- LogicalProject(inputs=[0..1], exprs=[[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY 2022-01-01 NULLS FIRST)]])
      +- LogicalTableScan(table=[[default_catalog, default_database, temp_table]]) {code}
From the perspective of SQL semantics, using a constant as the order by field for row_number has no meaning, because the constant will not change the sorting result of row_number, and each row will get the same rank. As a current workaround, please try to specify another field to ensure that row_number is sorted in the correct order.

While I tested the query against MySQL and PostgreSQL, they do allow this to happen, and just output the first inserted row. Do you think we need to align this behavior? cc [~godfreyhe]  [~lincoln.86xy] 

 

 

> Over Agg: The window rank function without order by error in top N query
> ------------------------------------------------------------------------
>
>                 Key: FLINK-31165
>                 URL: https://issues.apache.org/jira/browse/FLINK-31165
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.16.0
>            Reporter: P Rohan Kumar
>            Priority: Major
>
>  
> {code:java}
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env)
> val td = TableDescriptor.forConnector("datagen").option("rows-per-second", "10")
>   .option("number-of-rows", "10")
>   .schema(Schema
>     .newBuilder()
>     .column("NAME", DataTypes.VARCHAR(2147483647))
>     .column("ROLLNO", DataTypes.DECIMAL(5, 0))
>     .column("DOB", DataTypes.DATE())
>     .column("CLASS", DataTypes.DECIMAL(2, 0))
>     .column("SUBJECT", DataTypes.VARCHAR(2147483647))
>     .build())
>   .build()
> val table = tableEnv.from(td)
> tableEnv.createTemporaryView("temp_table", table)
> val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as date) SRC_NO from temp_table")
> tableEnv.createTemporaryView("temp_table2", newTable)
> val newTable2 = tableEnv.sqlQuery("select * from (select NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum  from temp_table2 a) where rownum <= 1")
> tableEnv.toChangelogStream(newTable2).print()
> env.execute()
>  {code}
>  
>  
> I am getting the below error if I run the above code.
> I have already provided an order by column.
> If I change the order by column to some other column, such as "SUBJECT", then the job runs fine.
>  
>  
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args [rel#245:LogicalWindow.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))]
>     at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>     at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>     at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
>     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
>     at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
>     at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
>     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160)
>     at org.example.OverAggregateBug$.main(OverAggregateBug.scala:39)
>     at org.example.OverAggregateBug.main(OverAggregateBug.scala)
> Caused by: org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2(FlinkLogicalOverAggregate.scala:95)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2$adapted(FlinkLogicalOverAggregate.scala:92)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1(FlinkLogicalOverAggregate.scala:92)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1$adapted(FlinkLogicalOverAggregate.scala:89)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.convert(FlinkLogicalOverAggregate.scala:89)
>     at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:167)
>     at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
>     ... 27 more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)