You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lincoln lee (Jira)" <ji...@apache.org> on 2023/03/29 10:26:00 UTC

[jira] [Updated] (FLINK-31165) Over Agg: The window rank function without order by error in top N query

     [ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

lincoln lee updated FLINK-31165:
--------------------------------
    Fix Version/s: 1.18.0
                   1.17.1

> Over Agg: The window rank function without order by error in top N query
> ------------------------------------------------------------------------
>
>                 Key: FLINK-31165
>                 URL: https://issues.apache.org/jira/browse/FLINK-31165
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.16.0
>            Reporter: P Rohan Kumar
>            Assignee: Jane Chan
>            Priority: Major
>             Fix For: 1.18.0, 1.17.1
>
>
>  
> {code:java}
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env)
> val td = TableDescriptor.forConnector("datagen").option("rows-per-second", "10")
>   .option("number-of-rows", "10")
>   .schema(Schema
>     .newBuilder()
>     .column("NAME", DataTypes.VARCHAR(2147483647))
>     .column("ROLLNO", DataTypes.DECIMAL(5, 0))
>     .column("DOB", DataTypes.DATE())
>     .column("CLASS", DataTypes.DECIMAL(2, 0))
>     .column("SUBJECT", DataTypes.VARCHAR(2147483647))
>     .build())
>   .build()
> val table = tableEnv.from(td)
> tableEnv.createTemporaryView("temp_table", table)
> val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as date) SRC_NO from temp_table")
> tableEnv.createTemporaryView("temp_table2", newTable)
> val newTable2 = tableEnv.sqlQuery("select * from (select NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum  from temp_table2 a) where rownum <= 1")
> tableEnv.toChangelogStream(newTable2).print()
> env.execute()
>  {code}
>  
>  
> I am getting the below error if I run the above code.
> I have already provided an order by column.
> If I change the order by column to some other column, such as "SUBJECT", then the job runs fine.
>  
>  
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args [rel#245:LogicalWindow.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))]
>     at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>     at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>     at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
>     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
>     at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
>     at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
>     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160)
>     at org.example.OverAggregateBug$.main(OverAggregateBug.scala:39)
>     at org.example.OverAggregateBug.main(OverAggregateBug.scala)
> Caused by: org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2(FlinkLogicalOverAggregate.scala:95)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2$adapted(FlinkLogicalOverAggregate.scala:92)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1(FlinkLogicalOverAggregate.scala:92)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1$adapted(FlinkLogicalOverAggregate.scala:89)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.convert(FlinkLogicalOverAggregate.scala:89)
>     at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:167)
>     at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
>     ... 27 more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)