You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lincoln lee (Jira)" <ji...@apache.org> on 2022/03/03 10:04:00 UTC

[jira] [Commented] (FLINK-26461) Throw CannotPlanException in TableFunction

    [ https://issues.apache.org/jira/browse/FLINK-26461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500627#comment-17500627 ] 

lincoln lee commented on FLINK-26461:
-------------------------------------

[~SpongebobZ] Thanks for reporting this!
Could you offer more details about the test query? It may helps.

> Throw CannotPlanException in TableFunction
> ------------------------------------------
>
>                 Key: FLINK-26461
>                 URL: https://issues.apache.org/jira/browse/FLINK-26461
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.14.3
>            Reporter: Spongebob
>            Priority: Major
>
> I got an CannotPlanException when change the isDeterministic option to false. For detail see this code:
> {code:java}
> //代码占位符
> public class GetDayTimeEtlSwitch extends TableFunction<Integer> {
>     private boolean status = false;
>     @Override
>     public boolean isDeterministic() {
>         return false;
>     }
>     public void eval() {
>         if (status) {
>             collect(1);
>         } else {
>             if (System.currentTimeMillis() > 1646298908000L) {
>                 status = true;
>                 collect(1);
>             } else {
>                 collect(0);
>             }
>         }
>     }
> } {code}
> Exception stack...
> {code:java}
> //代码占位符
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0])
> +- FlinkLogicalJoin(condition=[true], joinType=[left])
>    :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME])
>    :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE])
>    +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL features.
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81)
>     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
>     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742)
>     at TestSwitch.main(TestSwitch.java:33)
> Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE].
> Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> STREAM_PHYSICAL, FlinkRelDistributionTraitDef: any -> single]
> There is 1 empty subset: rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows
> 168:FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)])Root: rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]
> Original rel:
> FlinkLogicalSink(subset=[rel#140:RelSubset#4.LOGICAL.any.None: 0.[NONE].[NONE]], table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 151
>   FlinkLogicalJoin(subset=[rel#150:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE]], condition=[true], joinType=[left]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.00000001E8 cpu, 4.800000001E9 io, 0.0 network, 0.0 memory}, id = 149
>     FlinkLogicalCalc(subset=[rel#148:RelSubset#1.LOGICAL.any.None: 0.[NONE].[NONE]], select=[STUNAME, SUBJECT, SCORE, PROCTIME() AS PROC_TIME]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 153
>       FlinkLogicalTableSourceScan(subset=[rel#143:RelSubset#0.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, id = 142
>     FlinkLogicalTableFunctionScan(subset=[rel#146:RelSubset#2.LOGICAL.any.None: 0.[NONE].[NONE]], invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 145Sets:
> Set#5, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE)
>     rel#172:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#142
>         rel#142:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
>     rel#184:RelSubset#5.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#183
>         rel#183:StreamPhysicalTableSourceScan.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
> Set#6, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME)
>     rel#174:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#173
>         rel#173:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#172,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
>     rel#186:RelSubset#6.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#185
>         rel#185:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#184,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
>         rel#188:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
>         rel#194:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,distribution=single), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory}
>     rel#187:RelSubset#6.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=rel#194
>         rel#188:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
>         rel#194:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,distribution=single), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory}
> Set#7, type: RecordType(INTEGER EXPR$0)
>     rel#175:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#168
>         rel#168:FlinkLogicalTableFunctionScan.LOGICAL.any.None: 0.[NONE].[NONE](invocation=GET_SWITCH(),rowType=RecordType(INTEGER EXPR$0)), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
>     rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=null
> Set#8, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)
>     rel#177:RelSubset#8.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#176
>         rel#176:FlinkLogicalJoin.LOGICAL.any.None: 0.[NONE].[NONE](left=RelSubset#174,right=RelSubset#175,condition=true,joinType=left), rowcount=1.0E8, cumulative cost={3.00000001E8 rows, 3.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}
>     rel#191:RelSubset#8.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
>         rel#190:StreamPhysicalJoin.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](left=RelSubset#187,right=RelSubset#189,joinType=LeftOuterJoin,where=true,select=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0,leftInputSpec=NoUniqueKey,rightInputSpec=NoUniqueKey), rowcount=1.0E8, cumulative cost={inf}
> Set#9, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)
>     rel#179:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#178
>         rel#178:FlinkLogicalSink.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#177,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0), rowcount=1.0E8, cumulative cost={4.00000001E8 rows, 4.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}
>     rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
>         rel#182:AbstractConverter.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#179,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
>         rel#192:StreamPhysicalSink.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#191,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0), rowcount=1.0E8, cumulative cost={inf}Graphviz:
> digraph G {
>     root [style=filled,label="Root"];
>     subgraph cluster5{
>         label="Set 5 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE)";
>         rel142 [label="rel#142:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
>         rel183 [label="rel#183:StreamPhysicalTableSourceScan\ntable=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
>         subset172 [label="rel#172:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE]"]
>         subset184 [label="rel#184:RelSubset#5.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
>     }
>     subgraph cluster6{
>         label="Set 6 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME)";
>         rel173 [label="rel#173:FlinkLogicalCalc\ninput=RelSubset#172,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
>         rel185 [label="rel#185:StreamPhysicalCalc\ninput=RelSubset#184,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
>         rel188 [label="rel#188:AbstractConverter\ninput=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box]
>         rel194 [label="rel#194:StreamPhysicalExchange\ninput=RelSubset#186,distribution=single\nrows=1.0E8, cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory}",color=blue,shape=box]
>         subset174 [label="rel#174:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE]"]
>         subset186 [label="rel#186:RelSubset#6.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
>         subset187 [label="rel#187:RelSubset#6.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]"]
>         subset186 -> subset187;    }
>     subgraph cluster7{
>         label="Set 7 RecordType(INTEGER EXPR$0)";
>         rel168 [label="rel#168:FlinkLogicalTableFunctionScan\ninvocation=GET_SWITCH(),rowType=RecordType(INTEGER EXPR$0)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
>         subset175 [label="rel#175:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE]"]
>         subset189 [label="rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]",color=red]
>     }
>     subgraph cluster8{
>         label="Set 8 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)";
>         rel176 [label="rel#176:FlinkLogicalJoin\nleft=RelSubset#174,right=RelSubset#175,condition=true,joinType=left\nrows=1.0E8, cost={3.00000001E8 rows, 3.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
>         rel190 [label="rel#190:StreamPhysicalJoin\nleft=RelSubset#187,right=RelSubset#189,joinType=LeftOuterJoin,where=true,select=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0,leftInputSpec=NoUniqueKey,rightInputSpec=NoUniqueKey\nrows=1.0E8, cost={inf}",shape=box]
>         subset177 [label="rel#177:RelSubset#8.LOGICAL.any.None: 0.[NONE].[NONE]"]
>         subset191 [label="rel#191:RelSubset#8.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
>     }
>     subgraph cluster9{
>         label="Set 9 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)";
>         rel178 [label="rel#178:FlinkLogicalSink\ninput=RelSubset#177,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0\nrows=1.0E8, cost={4.00000001E8 rows, 4.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
>         rel182 [label="rel#182:AbstractConverter\ninput=RelSubset#179,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box]
>         rel192 [label="rel#192:StreamPhysicalSink\ninput=RelSubset#191,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0\nrows=1.0E8, cost={inf}",shape=box]
>         subset179 [label="rel#179:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE]"]
>         subset181 [label="rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
>     }
>     root -> subset181;
>     subset172 -> rel142[color=blue];
>     subset184 -> rel183[color=blue];
>     subset174 -> rel173[color=blue]; rel173 -> subset172[color=blue];
>     subset186 -> rel185[color=blue]; rel185 -> subset184[color=blue];
>     subset187 -> rel188; rel188 -> subset186;
>     subset187 -> rel194[color=blue]; rel194 -> subset186[color=blue];
>     subset175 -> rel168[color=blue];
>     subset177 -> rel176[color=blue]; rel176 -> subset174[color=blue,label="0"]; rel176 -> subset175[color=blue,label="1"];
>     subset191 -> rel190; rel190 -> subset187[label="0"]; rel190 -> subset189[label="1"];
>     subset179 -> rel178[color=blue]; rel178 -> subset177[color=blue];
>     subset181 -> rel182; rel182 -> subset179;
>     subset181 -> rel192; rel192 -> subset191;
> }
>     at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:742)
>     at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:365)
>     at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:520)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>     at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
>     ... 23 more
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)