You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Sergey Nuyanzin (Jira)" <ji...@apache.org> on 2022/09/06 06:54:00 UTC

[jira] [Created] (FLINK-29204) JoinITCase#testUncorrelatedScalar fails with Cannot generate a valid execution plan for the given query after calcite update 1.27

Sergey Nuyanzin created FLINK-29204:
---------------------------------------

             Summary: JoinITCase#testUncorrelatedScalar fails with Cannot generate a valid execution plan for the given query after calcite update 1.27
                 Key: FLINK-29204
                 URL: https://issues.apache.org/jira/browse/FLINK-29204
             Project: Flink
          Issue Type: Sub-task
          Components: Table SQL / API, Table SQL / Planner
            Reporter: Sergey Nuyanzin


{noformat}

org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalSink(table=[*anonymous_collect$69*], fields=[b])
+- FlinkLogicalCalc(select=[EXPR$0 AS b])
   +- FlinkLogicalJoin(condition=[true], joinType=[left])
      :- FlinkLogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[\{ 0 }]])
      +- FlinkLogicalValues(type=[RecordType(INTEGER EXPR$0)], tuples=[[\{ 1 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.

    at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
    at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
    at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
    at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
    at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
    at scala.collection.immutable.List.foreach(List.scala:388)
    at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
    at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
    at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1723)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:860)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1368)
    at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:475)
    at org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:308)
    at org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:144)
    at org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:108)
    at org.apache.flink.table.planner.runtime.batch.sql.join.JoinITCase.testUncorrelatedScalar(JoinITCase.scala:1061)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
    at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
    at org.junit.runners.Suite.runChild(Suite.java:128)
    at org.junit.runners.Suite.runChild(Suite.java:27)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is FlinkLogicalJoin[convention: LOGICAL -> BATCH_PHYSICAL]
There is 1 empty subset: rel#62551:RelSubset#7.BATCH_PHYSICAL.any.[], the relevant part of the original plan is as follows
62538:FlinkLogicalJoin(condition=[true], joinType=[left])
  62508:FlinkLogicalValues(subset=[rel#62536:RelSubset#5.LOGICAL.any.[0]], tuples=[[\{ 0 }]])
  62510:FlinkLogicalValues(subset=[rel#62537:RelSubset#6.LOGICAL.any.[0]], tuples=[[\{ 1 }]])

Root: rel#62545:RelSubset#9.BATCH_PHYSICAL.any.[]
Original rel:
FlinkLogicalSink(subset=[rel#62506:RelSubset#4.LOGICAL.any.[]], table=[*anonymous_collect$69*], fields=[b]): rowcount = 1.0, cumulative cost = \{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62517
  FlinkLogicalCalc(subset=[rel#62516:RelSubset#3.LOGICAL.any.[]], select=[EXPR$0 AS b]): rowcount = 1.0, cumulative cost = \{1.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62520
    FlinkLogicalJoin(subset=[rel#62514:RelSubset#2.LOGICAL.any.[]], condition=[true], joinType=[left]): rowcount = 1.0, cumulative cost = \{1.0 rows, 2.0 cpu, 5.0 io, 0.0 network, 0.0 memory}, id = 62513
      FlinkLogicalValues(subset=[rel#62509:RelSubset#0.LOGICAL.any.[0]], tuples=[[\{ 0 }]]): rowcount = 1.0, cumulative cost = \{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62508
      FlinkLogicalValues(subset=[rel#62511:RelSubset#1.LOGICAL.any.[0]], tuples=[[\{ 1 }]]): rowcount = 1.0, cumulative cost = \{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 62510

Sets:
Set#5, type: RecordType(INTEGER ZERO)
    rel#62536:RelSubset#5.LOGICAL.any.[0], best=rel#62508
        rel#62508:FlinkLogicalValues.LOGICAL.any.[0](type=RecordType(INTEGER ZERO),tuples=[\{ 0 }]), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
    rel#62548:RelSubset#5.BATCH_PHYSICAL.any.[0], best=rel#62547
        rel#62547:BatchPhysicalValues.BATCH_PHYSICAL.any.[0](type=RecordType(INTEGER ZERO),tuples=[\{ 0 }],values=ZERO), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
Set#6, type: RecordType(INTEGER EXPR$0)
    rel#62537:RelSubset#6.LOGICAL.any.[0], best=rel#62510
        rel#62510:FlinkLogicalValues.LOGICAL.any.[0](type=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }]), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
    rel#62550:RelSubset#6.BATCH_PHYSICAL.any.[0], best=rel#62549
        rel#62549:BatchPhysicalValues.BATCH_PHYSICAL.any.[0](type=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }],values=EXPR$0), rowcount=1.0, cumulative cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}
Set#7, type: RecordType(INTEGER ZERO, INTEGER EXPR$0)
    rel#62539:RelSubset#7.LOGICAL.any.[], best=rel#62538
        rel#62538:FlinkLogicalJoin.LOGICAL.any.[](left=RelSubset#62536,right=RelSubset#62537,condition=true,joinType=left), rowcount=1.0, cumulative cost=\{3.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}
    rel#62551:RelSubset#7.BATCH_PHYSICAL.any.[], best=null
Set#8, type: RecordType(INTEGER b)
    rel#62541:RelSubset#8.LOGICAL.any.[], best=rel#62540
        rel#62540:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#62539,select=EXPR$0 AS b), rowcount=1.0, cumulative cost=\{4.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}
    rel#62553:RelSubset#8.BATCH_PHYSICAL.any.[], best=null
        rel#62552:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#62551,select=EXPR$0 AS b), rowcount=1.0, cumulative cost=\{inf}
Set#9, type: RecordType(INTEGER b)
    rel#62544:RelSubset#9.LOGICAL.any.[], best=rel#62543
        rel#62543:FlinkLogicalSink.LOGICAL.any.[](input=RelSubset#62541,table=*anonymous_collect$69*,fields=b), rowcount=1.0, cumulative cost=\{5.0 rows, 5.0 cpu, 5.0 io, 0.0 network, 0.0 memory}
    rel#62545:RelSubset#9.BATCH_PHYSICAL.any.[], best=null
        rel#62546:AbstractConverter.BATCH_PHYSICAL.any.[](input=RelSubset#62544,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]), rowcount=1.0, cumulative cost=\{inf}
        rel#62554:BatchPhysicalSink.BATCH_PHYSICAL.any.[](input=RelSubset#62553,table=*anonymous_collect$69*,fields=b), rowcount=1.0, cumulative cost=\{inf}

Graphviz:
digraph G {
    root [style=filled,label="Root"];
    subgraph cluster5{
        label="Set 5 RecordType(INTEGER ZERO)";
        rel62508 [label="rel#62508:FlinkLogicalValues\ntype=RecordType(INTEGER ZERO),tuples=[\{ 0 }]\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel62547 [label="rel#62547:BatchPhysicalValues\ntype=RecordType(INTEGER ZERO),tuples=[\{ 0 }],values=ZERO\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        subset62536 [label="rel#62536:RelSubset#5.LOGICAL.any.[0]"]
        subset62548 [label="rel#62548:RelSubset#5.BATCH_PHYSICAL.any.[0]"]
    }
    subgraph cluster6{
        label="Set 6 RecordType(INTEGER EXPR$0)";
        rel62510 [label="rel#62510:FlinkLogicalValues\ntype=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }]\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel62549 [label="rel#62549:BatchPhysicalValues\ntype=RecordType(INTEGER EXPR$0),tuples=[\{ 1 }],values=EXPR$0\nrows=1.0, cost=\{1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        subset62537 [label="rel#62537:RelSubset#6.LOGICAL.any.[0]"]
        subset62550 [label="rel#62550:RelSubset#6.BATCH_PHYSICAL.any.[0]"]
    }
    subgraph cluster7{
        label="Set 7 RecordType(INTEGER ZERO, INTEGER EXPR$0)";
        rel62538 [label="rel#62538:FlinkLogicalJoin\nleft=RelSubset#62536,right=RelSubset#62537,condition=true,joinType=left\nrows=1.0, cost=\{3.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        subset62539 [label="rel#62539:RelSubset#7.LOGICAL.any.[]"]
        subset62551 [label="rel#62551:RelSubset#7.BATCH_PHYSICAL.any.[]",color=red]
    }
    subgraph cluster8{
        label="Set 8 RecordType(INTEGER b)";
        rel62540 [label="rel#62540:FlinkLogicalCalc\ninput=RelSubset#62539,select=EXPR$0 AS b\nrows=1.0, cost=\{4.0 rows, 4.0 cpu, 5.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel62552 [label="rel#62552:BatchPhysicalCalc\ninput=RelSubset#62551,select=EXPR$0 AS b\nrows=1.0, cost=\{inf}",shape=box]
        subset62541 [label="rel#62541:RelSubset#8.LOGICAL.any.[]"]
        subset62553 [label="rel#62553:RelSubset#8.BATCH_PHYSICAL.any.[]"]
    }
    subgraph cluster9{
        label="Set 9 RecordType(INTEGER b)";
        rel62543 [label="rel#62543:FlinkLogicalSink\ninput=RelSubset#62541,table=*anonymous_collect$69*,fields=b\nrows=1.0, cost=\{5.0 rows, 5.0 cpu, 5.0 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel62546 [label="rel#62546:AbstractConverter\ninput=RelSubset#62544,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0, cost=\{inf}",shape=box]
        rel62554 [label="rel#62554:BatchPhysicalSink\ninput=RelSubset#62553,table=*anonymous_collect$69*,fields=b\nrows=1.0, cost=\{inf}",shape=box]
        subset62544 [label="rel#62544:RelSubset#9.LOGICAL.any.[]"]
        subset62545 [label="rel#62545:RelSubset#9.BATCH_PHYSICAL.any.[]"]
    }
    root -> subset62545;
    subset62536 -> rel62508[color=blue];
    subset62548 -> rel62547[color=blue];
    subset62537 -> rel62510[color=blue];
    subset62550 -> rel62549[color=blue];
    subset62539 -> rel62538[color=blue]; rel62538 -> subset62536[color=blue,label="0"]; rel62538 -> subset62537[color=blue,label="1"];
    subset62541 -> rel62540[color=blue]; rel62540 -> subset62539[color=blue];
    subset62553 -> rel62552; rel62552 -> subset62551;
    subset62544 -> rel62543[color=blue]; rel62543 -> subset62541[color=blue];
    subset62545 -> rel62546; rel62546 -> subset62544;
    subset62545 -> rel62554; rel62554 -> subset62553;
}
    at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:709)
    at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:390)
    at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:539)
    at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:316)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
    ... 72 more

 

{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)