You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lincoln lee (Jira)" <ji...@apache.org> on 2024/03/11 12:39:05 UTC

[jira] [Updated] (FLINK-28764) Support more than 64 distinct aggregate function calls in one aggregate SQL query

     [ https://issues.apache.org/jira/browse/FLINK-28764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

lincoln lee updated FLINK-28764:
--------------------------------
    Fix Version/s: 1.20.0

> Support more than 64 distinct aggregate function calls in one aggregate SQL query
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-28764
>                 URL: https://issues.apache.org/jira/browse/FLINK-28764
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.6, 1.14.5, 1.15.1
>            Reporter: Wei Zhong
>            Assignee: Wei Zhong
>            Priority: Major
>              Labels: pull-request-available, stale-assigned
>             Fix For: 1.19.0, 1.20.0
>
>
> Currently Flink SQL does not support more than 64 distinct aggregate function calls in one aggregate SQL query. We encountered this problem while migrating batch jobs from spark to flink. The spark job has 79 distinct aggregate function calls in one aggregate SQL query.
> Reproduce code:
> {code:java}
> public class Test64Distinct {
>     public static void main(String[] args) {
>         TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
>         tableEnv.executeSql("create table datagen_source(id BIGINT, val BIGINT) with " +
>                 "('connector'='datagen', 'number-of-rows'='1000')");
>         tableEnv.executeSql("select " +
>                 "count(distinct val * 1), " +
>                 "count(distinct val * 2), " +
>                 "count(distinct val * 3), " +
>                 "count(distinct val * 4), " +
>                 "count(distinct val * 5), " +
>                 "count(distinct val * 6), " +
>                 "count(distinct val * 7), " +
>                 "count(distinct val * 8), " +
>                 "count(distinct val * 9), " +
>                 "count(distinct val * 10), " +
>                 "count(distinct val * 11), " +
>                 "count(distinct val * 12), " +
>                 "count(distinct val * 13), " +
>                 "count(distinct val * 14), " +
>                 "count(distinct val * 15), " +
>                 "count(distinct val * 16), " +
>                 "count(distinct val * 17), " +
>                 "count(distinct val * 18), " +
>                 "count(distinct val * 19), " +
>                 "count(distinct val * 20), " +
>                 "count(distinct val * 21), " +
>                 "count(distinct val * 22), " +
>                 "count(distinct val * 23), " +
>                 "count(distinct val * 24), " +
>                 "count(distinct val * 25), " +
>                 "count(distinct val * 26), " +
>                 "count(distinct val * 27), " +
>                 "count(distinct val * 28), " +
>                 "count(distinct val * 29), " +
>                 "count(distinct val * 30), " +
>                 "count(distinct val * 31), " +
>                 "count(distinct val * 32), " +
>                 "count(distinct val * 33), " +
>                 "count(distinct val * 34), " +
>                 "count(distinct val * 35), " +
>                 "count(distinct val * 36), " +
>                 "count(distinct val * 37), " +
>                 "count(distinct val * 38), " +
>                 "count(distinct val * 39), " +
>                 "count(distinct val * 40), " +
>                 "count(distinct val * 41), " +
>                 "count(distinct val * 42), " +
>                 "count(distinct val * 43), " +
>                 "count(distinct val * 44), " +
>                 "count(distinct val * 45), " +
>                 "count(distinct val * 46), " +
>                 "count(distinct val * 47), " +
>                 "count(distinct val * 48), " +
>                 "count(distinct val * 49), " +
>                 "count(distinct val * 50), " +
>                 "count(distinct val * 51), " +
>                 "count(distinct val * 52), " +
>                 "count(distinct val * 53), " +
>                 "count(distinct val * 54), " +
>                 "count(distinct val * 55), " +
>                 "count(distinct val * 56), " +
>                 "count(distinct val * 57), " +
>                 "count(distinct val * 58), " +
>                 "count(distinct val * 59), " +
>                 "count(distinct val * 60), " +
>                 "count(distinct val * 61), " +
>                 "count(distinct val * 62), " +
>                 "count(distinct val * 63), " +
>                 "count(distinct val * 64), " +
>                 "count(distinct val * 65) from datagen_source").print();
>     }
> } {code}
> Exception:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: Sql optimization: Cannot generate a valid execution plan for the given query: LogicalSink(table=[*anonymous_collect$1*], fields=[EXPR$0, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7, EXPR$8, EXPR$9, EXPR$10, EXPR$11, EXPR$12, EXPR$13, EXPR$14, EXPR$15, EXPR$16, EXPR$17, EXPR$18, EXPR$19, EXPR$20, EXPR$21, EXPR$22, EXPR$23, EXPR$24, EXPR$25, EXPR$26, EXPR$27, EXPR$28, EXPR$29, EXPR$30, EXPR$31, EXPR$32, EXPR$33, EXPR$34, EXPR$35, EXPR$36, EXPR$37, EXPR$38, EXPR$39, EXPR$40, EXPR$41, EXPR$42, EXPR$43, EXPR$44, EXPR$45, EXPR$46, EXPR$47, EXPR$48, EXPR$49, EXPR$50, EXPR$51, EXPR$52, EXPR$53, EXPR$54, EXPR$55, EXPR$56, EXPR$57, EXPR$58, EXPR$59, EXPR$60, EXPR$61, EXPR$62, EXPR$63, EXPR$64])
> +- LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $2)], EXPR$3=[COUNT(DISTINCT $3)], EXPR$4=[COUNT(DISTINCT $4)], EXPR$5=[COUNT(DISTINCT $5)], EXPR$6=[COUNT(DISTINCT $6)], EXPR$7=[COUNT(DISTINCT $7)], EXPR$8=[COUNT(DISTINCT $8)], EXPR$9=[COUNT(DISTINCT $9)], EXPR$10=[COUNT(DISTINCT $10)], EXPR$11=[COUNT(DISTINCT $11)], EXPR$12=[COUNT(DISTINCT $12)], EXPR$13=[COUNT(DISTINCT $13)], EXPR$14=[COUNT(DISTINCT $14)], EXPR$15=[COUNT(DISTINCT $15)], EXPR$16=[COUNT(DISTINCT $16)], EXPR$17=[COUNT(DISTINCT $17)], EXPR$18=[COUNT(DISTINCT $18)], EXPR$19=[COUNT(DISTINCT $19)], EXPR$20=[COUNT(DISTINCT $20)], EXPR$21=[COUNT(DISTINCT $21)], EXPR$22=[COUNT(DISTINCT $22)], EXPR$23=[COUNT(DISTINCT $23)], EXPR$24=[COUNT(DISTINCT $24)], EXPR$25=[COUNT(DISTINCT $25)], EXPR$26=[COUNT(DISTINCT $26)], EXPR$27=[COUNT(DISTINCT $27)], EXPR$28=[COUNT(DISTINCT $28)], EXPR$29=[COUNT(DISTINCT $29)], EXPR$30=[COUNT(DISTINCT $30)], EXPR$31=[COUNT(DISTINCT $31)], EXPR$32=[COUNT(DISTINCT $32)], EXPR$33=[COUNT(DISTINCT $33)], EXPR$34=[COUNT(DISTINCT $34)], EXPR$35=[COUNT(DISTINCT $35)], EXPR$36=[COUNT(DISTINCT $36)], EXPR$37=[COUNT(DISTINCT $37)], EXPR$38=[COUNT(DISTINCT $38)], EXPR$39=[COUNT(DISTINCT $39)], EXPR$40=[COUNT(DISTINCT $40)], EXPR$41=[COUNT(DISTINCT $41)], EXPR$42=[COUNT(DISTINCT $42)], EXPR$43=[COUNT(DISTINCT $43)], EXPR$44=[COUNT(DISTINCT $44)], EXPR$45=[COUNT(DISTINCT $45)], EXPR$46=[COUNT(DISTINCT $46)], EXPR$47=[COUNT(DISTINCT $47)], EXPR$48=[COUNT(DISTINCT $48)], EXPR$49=[COUNT(DISTINCT $49)], EXPR$50=[COUNT(DISTINCT $50)], EXPR$51=[COUNT(DISTINCT $51)], EXPR$52=[COUNT(DISTINCT $52)], EXPR$53=[COUNT(DISTINCT $53)], EXPR$54=[COUNT(DISTINCT $54)], EXPR$55=[COUNT(DISTINCT $55)], EXPR$56=[COUNT(DISTINCT $56)], EXPR$57=[COUNT(DISTINCT $57)], EXPR$58=[COUNT(DISTINCT $58)], EXPR$59=[COUNT(DISTINCT $59)], EXPR$60=[COUNT(DISTINCT $60)], EXPR$61=[COUNT(DISTINCT $61)], EXPR$62=[COUNT(DISTINCT $62)], EXPR$63=[COUNT(DISTINCT $63)], EXPR$64=[COUNT(DISTINCT $64)])
>    +- LogicalProject(exprs=[[*($1, 1), *($1, 2), *($1, 3), *($1, 4), *($1, 5), *($1, 6), *($1, 7), *($1, 8), *($1, 9), *($1, 10), *($1, 11), *($1, 12), *($1, 13), *($1, 14), *($1, 15), *($1, 16), *($1, 17), *($1, 18), *($1, 19), *($1, 20), *($1, 21), *($1, 22), *($1, 23), *($1, 24), *($1, 25), *($1, 26), *($1, 27), *($1, 28), *($1, 29), *($1, 30), *($1, 31), *($1, 32), *($1, 33), *($1, 34), *($1, 35), *($1, 36), *($1, 37), *($1, 38), *($1, 39), *($1, 40), *($1, 41), *($1, 42), *($1, 43), *($1, 44), *($1, 45), *($1, 46), *($1, 47), *($1, 48), *($1, 49), *($1, 50), *($1, 51), *($1, 52), *($1, 53), *($1, 54), *($1, 55), *($1, 56), *($1, 57), *($1, 58), *($1, 59), *($1, 60), *($1, 61), *($1, 62), *($1, 63), *($1, 64), *($1, 65)]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, datagen_source]])group count must be less than 64.
> Please check the documentation for the set of currently supported SQL features.
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:86)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:92)
>     at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
>     at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:44)
>     at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:44)
>     at scala.collection.immutable.List.foreach(List.scala:388)
>     at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44)
>     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
>     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:312)
>     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:192)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1688)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:840)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1342)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:708)
>     at com.shopee.di.Test64Distinct.main(Test64Distinct.java:11)
> Caused by: org.apache.flink.table.api.TableException: group count must be less than 64.
>     at org.apache.flink.table.planner.plan.rules.logical.DecomposeGroupingSetsRule.onMatch(DecomposeGroupingSetsRule.scala:177)
>     at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
>     at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>     at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>     ... 27 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)