You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jane Chan (Jira)" <ji...@apache.org> on 2023/03/22 03:43:00 UTC

[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query

    [ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703466#comment-17703466 ] 

Jane Chan commented on FLINK-31165:
-----------------------------------

Hi [~rohankrao] , thanks for reporting this issue.

Actually, the order by field SRC_NO is a constant and is folded during query rewrite. 

 
{code:java}
LogicalProject(inputs=[0..2])
+- LogicalFilter(condition=[<=($2, 1)])
   +- LogicalProject(inputs=[0..1], exprs=[[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY 2022-01-01 NULLS FIRST)]])
      +- LogicalTableScan(table=[[default_catalog, default_database, temp_table]]) {code}
From the perspective of SQL semantics, using a constant as the order by field for row_number has no meaning, because the constant will not change the sorting result of row_number, and each row will get the same rank. As a current workaround, please try to specify another field to ensure that row_number is sorted in the correct order.

While I tested the query against MySQL and PostgreSQL, they do allow this to happen, and just output the first inserted row. Do you think we need to align this behavior? cc [~godfreyhe]  [~lincoln.86xy] 

 

 

> Over Agg: The window rank function without order by error in top N query
> ------------------------------------------------------------------------
>
>                 Key: FLINK-31165
>                 URL: https://issues.apache.org/jira/browse/FLINK-31165
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.16.0
>            Reporter: P Rohan Kumar
>            Priority: Major
>
>  
> {code:java}
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env)
> val td = TableDescriptor.forConnector("datagen").option("rows-per-second", "10")
>   .option("number-of-rows", "10")
>   .schema(Schema
>     .newBuilder()
>     .column("NAME", DataTypes.VARCHAR(2147483647))
>     .column("ROLLNO", DataTypes.DECIMAL(5, 0))
>     .column("DOB", DataTypes.DATE())
>     .column("CLASS", DataTypes.DECIMAL(2, 0))
>     .column("SUBJECT", DataTypes.VARCHAR(2147483647))
>     .build())
>   .build()
> val table = tableEnv.from(td)
> tableEnv.createTemporaryView("temp_table", table)
> val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as date) SRC_NO from temp_table")
> tableEnv.createTemporaryView("temp_table2", newTable)
> val newTable2 = tableEnv.sqlQuery("select * from (select NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum  from temp_table2 a) where rownum <= 1")
> tableEnv.toChangelogStream(newTable2).print()
> env.execute()
>  {code}
>  
>  
> I am getting the below error if I run the above code.
> I have already provided an order by column.
> If I change the order by column to some other column, such as "SUBJECT", then the job runs fine.
>  
>  
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args [rel#245:LogicalWindow.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))]
>     at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>     at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>     at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
>     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
>     at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
>     at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
>     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160)
>     at org.example.OverAggregateBug$.main(OverAggregateBug.scala:39)
>     at org.example.OverAggregateBug.main(OverAggregateBug.scala)
> Caused by: org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2(FlinkLogicalOverAggregate.scala:95)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2$adapted(FlinkLogicalOverAggregate.scala:92)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1(FlinkLogicalOverAggregate.scala:92)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1$adapted(FlinkLogicalOverAggregate.scala:89)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.convert(FlinkLogicalOverAggregate.scala:89)
>     at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:167)
>     at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
>     ... 27 more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)