You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Spongebob (Jira)" <ji...@apache.org> on 2022/03/03 09:23:00 UTC

[jira] [Created] (FLINK-26461) Throw CannotPlanException in TableFunction

Spongebob created FLINK-26461:
---------------------------------

             Summary: Throw CannotPlanException in TableFunction
                 Key: FLINK-26461
                 URL: https://issues.apache.org/jira/browse/FLINK-26461
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.14.3
            Reporter: Spongebob


I got an CannotPlanException when change the isDeterministic option to false. For detail see this code:
{code:java}
//代码占位符
public class GetDayTimeEtlSwitch extends TableFunction<Integer> {
    private boolean status = false;


    @Override
    public boolean isDeterministic() {
        return false;
    }

    public void eval() {
        if (status) {
            collect(1);
        } else {
            if (System.currentTimeMillis() > 1646298908000L) {
                status = true;
                collect(1);
            } else {
                collect(0);
            }

        }

    }
} {code}
Exception stack...
{code:java}
//代码占位符
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0])
+- FlinkLogicalJoin(condition=[true], joinType=[left])
   :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME])
   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE])
   +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
    at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81)
    at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
    at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742)
    at TestSwitch.main(TestSwitch.java:33)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE].
Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> STREAM_PHYSICAL, FlinkRelDistributionTraitDef: any -> single]
There is 1 empty subset: rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows
168:FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)])Root: rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]
Original rel:
FlinkLogicalSink(subset=[rel#140:RelSubset#4.LOGICAL.any.None: 0.[NONE].[NONE]], table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 151
  FlinkLogicalJoin(subset=[rel#150:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE]], condition=[true], joinType=[left]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.00000001E8 cpu, 4.800000001E9 io, 0.0 network, 0.0 memory}, id = 149
    FlinkLogicalCalc(subset=[rel#148:RelSubset#1.LOGICAL.any.None: 0.[NONE].[NONE]], select=[STUNAME, SUBJECT, SCORE, PROCTIME() AS PROC_TIME]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 153
      FlinkLogicalTableSourceScan(subset=[rel#143:RelSubset#0.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, id = 142
    FlinkLogicalTableFunctionScan(subset=[rel#146:RelSubset#2.LOGICAL.any.None: 0.[NONE].[NONE]], invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER EXPR$0)]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 145Sets:
Set#5, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE)
    rel#172:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#142
        rel#142:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
    rel#184:RelSubset#5.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#183
        rel#183:StreamPhysicalTableSourceScan.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
Set#6, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME)
    rel#174:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#173
        rel#173:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#172,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
    rel#186:RelSubset#6.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#185
        rel#185:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#184,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
        rel#188:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
        rel#194:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,distribution=single), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory}
    rel#187:RelSubset#6.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=rel#194
        rel#188:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
        rel#194:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#186,distribution=single), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory}
Set#7, type: RecordType(INTEGER EXPR$0)
    rel#175:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#168
        rel#168:FlinkLogicalTableFunctionScan.LOGICAL.any.None: 0.[NONE].[NONE](invocation=GET_SWITCH(),rowType=RecordType(INTEGER EXPR$0)), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
    rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=null
Set#8, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)
    rel#177:RelSubset#8.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#176
        rel#176:FlinkLogicalJoin.LOGICAL.any.None: 0.[NONE].[NONE](left=RelSubset#174,right=RelSubset#175,condition=true,joinType=left), rowcount=1.0E8, cumulative cost={3.00000001E8 rows, 3.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}
    rel#191:RelSubset#8.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
        rel#190:StreamPhysicalJoin.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](left=RelSubset#187,right=RelSubset#189,joinType=LeftOuterJoin,where=true,select=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0,leftInputSpec=NoUniqueKey,rightInputSpec=NoUniqueKey), rowcount=1.0E8, cumulative cost={inf}
Set#9, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)
    rel#179:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#178
        rel#178:FlinkLogicalSink.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#177,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0), rowcount=1.0E8, cumulative cost={4.00000001E8 rows, 4.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}
    rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
        rel#182:AbstractConverter.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#179,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
        rel#192:StreamPhysicalSink.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#191,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0), rowcount=1.0E8, cumulative cost={inf}Graphviz:
digraph G {
    root [style=filled,label="Root"];
    subgraph cluster5{
        label="Set 5 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE)";
        rel142 [label="rel#142:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel183 [label="rel#183:StreamPhysicalTableSourceScan\ntable=[default_catalog, default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        subset172 [label="rel#172:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE]"]
        subset184 [label="rel#184:RelSubset#5.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
    }
    subgraph cluster6{
        label="Set 6 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME)";
        rel173 [label="rel#173:FlinkLogicalCalc\ninput=RelSubset#172,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel185 [label="rel#185:StreamPhysicalCalc\ninput=RelSubset#184,select=STUNAME, SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel188 [label="rel#188:AbstractConverter\ninput=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box]
        rel194 [label="rel#194:StreamPhysicalExchange\ninput=RelSubset#186,distribution=single\nrows=1.0E8, cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory}",color=blue,shape=box]
        subset174 [label="rel#174:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE]"]
        subset186 [label="rel#186:RelSubset#6.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
        subset187 [label="rel#187:RelSubset#6.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]"]
        subset186 -> subset187;    }
    subgraph cluster7{
        label="Set 7 RecordType(INTEGER EXPR$0)";
        rel168 [label="rel#168:FlinkLogicalTableFunctionScan\ninvocation=GET_SWITCH(),rowType=RecordType(INTEGER EXPR$0)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        subset175 [label="rel#175:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE]"]
        subset189 [label="rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]",color=red]
    }
    subgraph cluster8{
        label="Set 8 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)";
        rel176 [label="rel#176:FlinkLogicalJoin\nleft=RelSubset#174,right=RelSubset#175,condition=true,joinType=left\nrows=1.0E8, cost={3.00000001E8 rows, 3.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel190 [label="rel#190:StreamPhysicalJoin\nleft=RelSubset#187,right=RelSubset#189,joinType=LeftOuterJoin,where=true,select=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0,leftInputSpec=NoUniqueKey,rightInputSpec=NoUniqueKey\nrows=1.0E8, cost={inf}",shape=box]
        subset177 [label="rel#177:RelSubset#8.LOGICAL.any.None: 0.[NONE].[NONE]"]
        subset191 [label="rel#191:RelSubset#8.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
    }
    subgraph cluster9{
        label="Set 9 RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)";
        rel178 [label="rel#178:FlinkLogicalSink\ninput=RelSubset#177,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0\nrows=1.0E8, cost={4.00000001E8 rows, 4.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel182 [label="rel#182:AbstractConverter\ninput=RelSubset#179,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box]
        rel192 [label="rel#192:StreamPhysicalSink\ninput=RelSubset#191,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0\nrows=1.0E8, cost={inf}",shape=box]
        subset179 [label="rel#179:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE]"]
        subset181 [label="rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
    }
    root -> subset181;
    subset172 -> rel142[color=blue];
    subset184 -> rel183[color=blue];
    subset174 -> rel173[color=blue]; rel173 -> subset172[color=blue];
    subset186 -> rel185[color=blue]; rel185 -> subset184[color=blue];
    subset187 -> rel188; rel188 -> subset186;
    subset187 -> rel194[color=blue]; rel194 -> subset186[color=blue];
    subset175 -> rel168[color=blue];
    subset177 -> rel176[color=blue]; rel176 -> subset174[color=blue,label="0"]; rel176 -> subset175[color=blue,label="1"];
    subset191 -> rel190; rel190 -> subset187[label="0"]; rel190 -> subset189[label="1"];
    subset179 -> rel178[color=blue]; rel178 -> subset177[color=blue];
    subset181 -> rel182; rel182 -> subset179;
    subset181 -> rel192; rel192 -> subset191;
}
    at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:742)
    at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:365)
    at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:520)
    at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
    ... 23 more
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)