You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Spongebob (Jira)" <ji...@apache.org> on 2022/02/28 02:14:00 UTC

[jira] [Created] (FLINK-26384) This exception indicates that the query uses an unsupported SQL feature.

Spongebob created FLINK-26384:
---------------------------------

             Summary: This exception indicates that the query uses an unsupported SQL feature.
                 Key: FLINK-26384
                 URL: https://issues.apache.org/jira/browse/FLINK-26384
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.14.3
            Reporter: Spongebob


Got an unsupported exception when generating a valid execution plan.
{code:java}
//代码占位符
TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());

// HERE'S DDL AND DML SQL
CREATE TABLE IF NOT EXISTS TABLE_A (CIR_CAPT DECIMAL(20,4),FULL_MKT_TSCP DECIMAL(20,4),TSCP DECIMAL(20,4),FULL_MKT_CIR_CAPT DECIMAL(20,4),SCR_INCD DECIMAL(19,0),AST_TYPE DECIMAL(3,0),SCR_TYPE DECIMAL(3,0)) WITH ('connector' = 'jdbc'...)

CREATE TABLE IF NOT EXISTS TABLE_B (AST_TYPE DECIMAL(4,0),CIR_CAPT DECIMAL(20,4),FULL_MKT_CIR_CAPT DECIMAL(20,4),FULL_MKT_TSCP DECIMAL(20,4),SCR_INCD DECIMAL(19,0),SCR_TYPE DECIMAL(8,0),TSCP DECIMAL(20,4),  PRIMARY KEY (SCR_INCD) NOT ENFORCED)  WITH ('connector' = 'jdbc'...)

INSERT INTO TABLE_B 
SELECT T_SOURCE.* FROM (
SELECT CASE WHEN B.AST_TYPE IS NULL THEN 1 ELSE B.AST_TYPE END AS AST_TYPE, B.CIR_CAPT AS CIR_CAPT, B.FULL_MKT_CIR_CAPT AS FULL_MKT_CIR_CAPT, B.FULL_MKT_TSCP AS FULL_MKT_TSCP, B.SCR_INCD AS SCR_INCD, CASE WHEN B.SCR_TYPE IS NULL THEN 1 ELSE B.SCR_TYPE END AS SCR_TYPE, B.TSCP AS TSCP FROM TABLE_A B
) T_SOURCE INNER JOIN ( SELECT DISTINCT SCR_INCD FROM TABLE_B) T_SINK ON T_SOURCE.SCR_INCD = T_SINK.SCR_INCD    

// EXCEPTION TRACK
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
    at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
    at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:85)
    at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:56)
    at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:44)
    at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:44)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44)
    at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
    at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
    at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:124)
    at com.xctech.cone.etl.migrate.batch.runner.RunSingleTask.main(RunSingleTask.java:111)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
Missing conversions are FlinkLogicalTableSourceScan[convention: LOGICAL -> BATCH_PHYSICAL, FlinkRelDistributionTraitDef: any -> hash[4]true], FlinkLogicalCalc[convention: LOGICAL -> BATCH_PHYSICAL, FlinkRelDistributionTraitDef: any -> broadcast], FlinkLogicalTableSourceScan[convention: LOGICAL -> BATCH_PHYSICAL]
There are 3 empty subsets:
Empty subset 0: rel#782:RelSubset#23.BATCH_PHYSICAL.broadcast.[], the relevant part of the original plan is as follows
755:FlinkLogicalCalc(select=[SCR_INCD])
  710:FlinkLogicalTableSourceScan(subset=[rel#754:RelSubset#22.LOGICAL.any.[]], table=[[default_catalog, default_database, TABLE_B]], fields=[AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP])Empty subset 1: rel#770:RelSubset#22.BATCH_PHYSICAL.any.[], the relevant part of the original plan is as follows
710:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, TABLE_B]], fields=[AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP])Empty subset 2: rel#799:RelSubset#22.BATCH_PHYSICAL.hash[4]true.[], the relevant part of the original plan is as follows
710:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, TABLE_B]], fields=[AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP])Root: rel#764:RelSubset#26.BATCH_PHYSICAL.any.[]
Original rel:
FlinkLogicalSink(subset=[rel#305:RelSubset#6.LOGICAL.any.[]], table=[default_catalog.default_database.TABLE_B], fields=[AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP]): rowcount = 3.0E7, cumulative cost = {3.0E7 rows, 3.0E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 322
  FlinkLogicalCalc(subset=[rel#321:RelSubset#5.LOGICAL.any.[]], select=[CAST(AST_TYPE) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CAST(SCR_TYPE) AS SCR_TYPE, TSCP]): rowcount = 3.0E7, cumulative cost = {3.0E7 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 330
    FlinkLogicalJoin(subset=[rel#319:RelSubset#4.LOGICAL.any.[]], condition=[=($4, $7)], joinType=[inner]): rowcount = 3.0E7, cumulative cost = {1.0E8 rows, 2.0E8 cpu, 8.5E9 io, 0.0 network, 0.0 memory}, id = 318
      FlinkLogicalCalc(subset=[rel#316:RelSubset#1.LOGICAL.any.[]], select=[CASE(IS NULL(_AST_TYPE), 1:DECIMAL(10, 0), CAST(_AST_TYPE)) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CASE(IS NULL(_SCR_TYPE), 1:DECIMAL(10, 0), CAST(_SCR_TYPE)) AS SCR_TYPE, TSCP]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 2.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 326
        FlinkLogicalTableSourceScan(subset=[rel#308:RelSubset#0.LOGICAL.any.[]], table=[[default_catalog, default_database, VAR__SCR_EXT_INFO_KRSCSN]], fields=[CIR_CAPT, FULL_MKT_TSCP, TSCP, FULL_MKT_CIR_CAPT, SCR_INCD, _AST_TYPE, _SCR_TYPE]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}, id = 307
      FlinkLogicalTableSourceScan(subset=[rel#317:RelSubset#3.LOGICAL.any.[]], table=[[default_catalog, default_database, TABLE_B, project=[SCR_INCD]]], fields=[SCR_INCD]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 1.2E9 io, 0.0 network, 0.0 memory}, id = 327Sets:
Set#20, type: RecordType(DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(20, 4) TSCP, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(19, 0) SCR_INCD, DECIMAL(3, 0) _AST_TYPE, DECIMAL(3, 0) _SCR_TYPE)
    rel#751:RelSubset#20.LOGICAL.any.[], best=rel#707
        rel#707:FlinkLogicalTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, VAR__SCR_EXT_INFO_KRSCSN],fields=CIR_CAPT, FULL_MKT_TSCP, TSCP, FULL_MKT_CIR_CAPT, SCR_INCD, _AST_TYPE, _SCR_TYPE), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}
    rel#767:RelSubset#20.BATCH_PHYSICAL.any.[], best=rel#766
        rel#766:BatchPhysicalTableSourceScan.BATCH_PHYSICAL.any.[](table=[default_catalog, default_database, VAR__SCR_EXT_INFO_KRSCSN],fields=CIR_CAPT, FULL_MKT_TSCP, TSCP, FULL_MKT_CIR_CAPT, SCR_INCD, _AST_TYPE, _SCR_TYPE), rowcount=1.0E8, cumulative cost={1.0E8 rows, 0.0 cpu, 8.4E9 io, 0.0 network, 0.0 memory}
        rel#794:AbstractConverter.BATCH_PHYSICAL.hash[4]true.[](input=RelSubset#767,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[4]true,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#809:BatchPhysicalExchange.BATCH_PHYSICAL.hash[4]true.[](input=RelSubset#767,distribution=hash[SCR_INCD]), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.68E10 cpu, 8.4E9 io, 8.4E9 network, 0.0 memory}
    rel#793:RelSubset#20.BATCH_PHYSICAL.hash[4]true.[], best=rel#809
        rel#794:AbstractConverter.BATCH_PHYSICAL.hash[4]true.[](input=RelSubset#767,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[4]true,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#809:BatchPhysicalExchange.BATCH_PHYSICAL.hash[4]true.[](input=RelSubset#767,distribution=hash[SCR_INCD]), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.68E10 cpu, 8.4E9 io, 8.4E9 network, 0.0 memory}
Set#21, type: RecordType(DECIMAL(10, 0) AST_TYPE, DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(19, 0) SCR_INCD, DECIMAL(10, 0) SCR_TYPE, DECIMAL(20, 4) TSCP)
    rel#753:RelSubset#21.LOGICAL.any.[], best=rel#752
        rel#752:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#751,select=CASE(IS NULL(_AST_TYPE), 1:DECIMAL(10, 0), CAST(_AST_TYPE)) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CASE(IS NULL(_SCR_TYPE), 1:DECIMAL(10, 0), CAST(_SCR_TYPE)) AS SCR_TYPE, TSCP), rowcount=1.0E8, cumulative cost={2.0E8 rows, 3.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}
    rel#769:RelSubset#21.BATCH_PHYSICAL.any.[], best=rel#768
        rel#768:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#767,select=CASE(IS NULL(_AST_TYPE), 1:DECIMAL(10, 0), CAST(_AST_TYPE)) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CASE(IS NULL(_SCR_TYPE), 1:DECIMAL(10, 0), CAST(_SCR_TYPE)) AS SCR_TYPE, TSCP), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}
        rel#774:AbstractConverter.BATCH_PHYSICAL.hash[4]true.[](input=RelSubset#769,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[4]true,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#790:BatchPhysicalExchange.BATCH_PHYSICAL.hash[4]true.[](input=RelSubset#769,distribution=hash[SCR_INCD]), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.7E10 cpu, 8.4E9 io, 8.4E9 network, 0.0 memory}
        rel#795:BatchPhysicalCalc.BATCH_PHYSICAL.hash[4]true.[](input=RelSubset#793,select=CASE(IS NULL(_AST_TYPE), 1:DECIMAL(10, 0), CAST(_AST_TYPE)) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CASE(IS NULL(_SCR_TYPE), 1:DECIMAL(10, 0), CAST(_SCR_TYPE)) AS SCR_TYPE, TSCP), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.7E10 cpu, 8.4E9 io, 8.4E9 network, 0.0 memory}
    rel#773:RelSubset#21.BATCH_PHYSICAL.hash[4]true.[], best=rel#790
        rel#774:AbstractConverter.BATCH_PHYSICAL.hash[4]true.[](input=RelSubset#769,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[4]true,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#790:BatchPhysicalExchange.BATCH_PHYSICAL.hash[4]true.[](input=RelSubset#769,distribution=hash[SCR_INCD]), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.7E10 cpu, 8.4E9 io, 8.4E9 network, 0.0 memory}
        rel#795:BatchPhysicalCalc.BATCH_PHYSICAL.hash[4]true.[](input=RelSubset#793,select=CASE(IS NULL(_AST_TYPE), 1:DECIMAL(10, 0), CAST(_AST_TYPE)) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CASE(IS NULL(_SCR_TYPE), 1:DECIMAL(10, 0), CAST(_SCR_TYPE)) AS SCR_TYPE, TSCP), rowcount=1.0E8, cumulative cost={3.0E8 rows, 1.7E10 cpu, 8.4E9 io, 8.4E9 network, 0.0 memory}
Set#22, type: RecordType(DECIMAL(4, 0) AST_TYPE, DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(19, 0) SCR_INCD, DECIMAL(8, 0) SCR_TYPE, DECIMAL(20, 4) TSCP)
    rel#754:RelSubset#22.LOGICAL.any.[], best=rel#710
        rel#710:FlinkLogicalTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, TABLE_B],fields=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}
    rel#770:RelSubset#22.BATCH_PHYSICAL.any.[], best=null
    rel#799:RelSubset#22.BATCH_PHYSICAL.hash[4]true.[], best=null
Set#23, type: RecordType(DECIMAL(19, 0) SCR_INCD)
    rel#756:RelSubset#23.LOGICAL.any.[], best=rel#755
        rel#755:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#754,select=SCR_INCD), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}
    rel#772:RelSubset#23.BATCH_PHYSICAL.any.[], best=null
        rel#771:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#770,select=SCR_INCD), rowcount=1.0E8, cumulative cost={inf}
        rel#776:AbstractConverter.BATCH_PHYSICAL.hash[0]true.[](input=RelSubset#772,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[0]true,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#783:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#772,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#798:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#775,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#797:BatchPhysicalExchange.BATCH_PHYSICAL.hash[0]true.[](input=RelSubset#772,distribution=hash[SCR_INCD]), rowcount=1.0E8, cumulative cost={inf}
        rel#800:BatchPhysicalCalc.BATCH_PHYSICAL.hash[0]true.[](input=RelSubset#799,select=SCR_INCD), rowcount=1.0E8, cumulative cost={inf}
        rel#803:AbstractConverter.BATCH_PHYSICAL.hash[0]true.[](input=RelSubset#782,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[0]true,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#802:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#772,distribution=broadcast), rowcount=1.0E8, cumulative cost={inf}
        rel#805:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#775,distribution=broadcast), rowcount=1.0E8, cumulative cost={inf}
        rel#807:BatchPhysicalExchange.BATCH_PHYSICAL.hash[0]true.[](input=RelSubset#782,distribution=hash[SCR_INCD]), rowcount=1.0E8, cumulative cost={inf}
    rel#775:RelSubset#23.BATCH_PHYSICAL.hash[0]true.[], best=null
        rel#776:AbstractConverter.BATCH_PHYSICAL.hash[0]true.[](input=RelSubset#772,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[0]true,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#797:BatchPhysicalExchange.BATCH_PHYSICAL.hash[0]true.[](input=RelSubset#772,distribution=hash[SCR_INCD]), rowcount=1.0E8, cumulative cost={inf}
        rel#800:BatchPhysicalCalc.BATCH_PHYSICAL.hash[0]true.[](input=RelSubset#799,select=SCR_INCD), rowcount=1.0E8, cumulative cost={inf}
        rel#803:AbstractConverter.BATCH_PHYSICAL.hash[0]true.[](input=RelSubset#782,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[0]true,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#807:BatchPhysicalExchange.BATCH_PHYSICAL.hash[0]true.[](input=RelSubset#782,distribution=hash[SCR_INCD]), rowcount=1.0E8, cumulative cost={inf}
    rel#782:RelSubset#23.BATCH_PHYSICAL.broadcast.[], best=null
        rel#783:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#772,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#798:AbstractConverter.BATCH_PHYSICAL.broadcast.[](input=RelSubset#775,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]), rowcount=1.0E8, cumulative cost={inf}
        rel#802:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#772,distribution=broadcast), rowcount=1.0E8, cumulative cost={inf}
        rel#805:BatchPhysicalExchange.BATCH_PHYSICAL.broadcast.[](input=RelSubset#775,distribution=broadcast), rowcount=1.0E8, cumulative cost={inf}
Set#24, type: RecordType(DECIMAL(10, 0) AST_TYPE, DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(19, 0) SCR_INCD, DECIMAL(10, 0) SCR_TYPE, DECIMAL(20, 4) TSCP, DECIMAL(19, 0) SCR_INCD0)
    rel#758:RelSubset#24.LOGICAL.any.[], best=rel#757
        rel#757:FlinkLogicalJoin.LOGICAL.any.[](left=RelSubset#753,right=RelSubset#756,condition==($4, $7),joinType=inner), rowcount=3.0E7, cumulative cost={5.0E8 rows, 6.0E8 cpu, 2.53E10 io, 0.0 network, 0.0 memory}
    rel#778:RelSubset#24.BATCH_PHYSICAL.any.[], best=null
        rel#777:BatchPhysicalHashJoin.BATCH_PHYSICAL.any.[](left=RelSubset#773,right=RelSubset#775,joinType=InnerJoin,where==(SCR_INCD, SCR_INCD0),select=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP, SCR_INCD0,build=right), rowcount=3.0E7, cumulative cost={inf}
        rel#780:BatchPhysicalSortMergeJoin.BATCH_PHYSICAL.any.[](left=RelSubset#773,right=RelSubset#775,joinType=InnerJoin,where==(SCR_INCD, SCR_INCD0),select=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP, SCR_INCD0), rowcount=3.0E7, cumulative cost={inf}
        rel#784:BatchPhysicalNestedLoopJoin.BATCH_PHYSICAL.any.[](left=RelSubset#769,right=RelSubset#782,joinType=InnerJoin,where==(SCR_INCD, SCR_INCD0),select=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP, SCR_INCD0,build=right), rowcount=3.0E7, cumulative cost={inf}
Set#25, type: RecordType(DECIMAL(4, 0) AST_TYPE, DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(19, 0) SCR_INCD, DECIMAL(8, 0) SCR_TYPE, DECIMAL(20, 4) TSCP)
    rel#760:RelSubset#25.LOGICAL.any.[], best=rel#759
        rel#759:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#758,select=CAST(AST_TYPE) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CAST(SCR_TYPE) AS SCR_TYPE, TSCP), rowcount=3.0E7, cumulative cost={5.3E8 rows, 6.0E8 cpu, 2.53E10 io, 0.0 network, 0.0 memory}
    rel#787:RelSubset#25.BATCH_PHYSICAL.any.[], best=null
        rel#786:BatchPhysicalCalc.BATCH_PHYSICAL.any.[](input=RelSubset#778,select=CAST(AST_TYPE) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CAST(SCR_TYPE) AS SCR_TYPE, TSCP), rowcount=3.0E7, cumulative cost={inf}
Set#26, type: RecordType(DECIMAL(4, 0) AST_TYPE, DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(19, 0) SCR_INCD, DECIMAL(8, 0) SCR_TYPE, DECIMAL(20, 4) TSCP)
    rel#763:RelSubset#26.LOGICAL.any.[], best=rel#762
        rel#762:FlinkLogicalSink.LOGICAL.any.[](input=RelSubset#760,table=default_catalog.default_database.TABLE_B,fields=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP), rowcount=3.0E7, cumulative cost={5.6E8 rows, 6.3E8 cpu, 2.53E10 io, 0.0 network, 0.0 memory}
    rel#764:RelSubset#26.BATCH_PHYSICAL.any.[], best=null
        rel#765:AbstractConverter.BATCH_PHYSICAL.any.[](input=RelSubset#763,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]), rowcount=3.0E7, cumulative cost={inf}
        rel#788:BatchPhysicalSink.BATCH_PHYSICAL.any.[](input=RelSubset#787,table=default_catalog.default_database.TABLE_B,fields=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP), rowcount=3.0E7, cumulative cost={inf}Graphviz:
digraph G {
    root [style=filled,label="Root"];
    subgraph cluster20{
        label="Set 20 RecordType(DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(20, 4) TSCP, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(19, 0) SCR_INCD, DECIMAL(3, 0) _AST_TYPE, DECIMAL(3, 0) _SCR_TYPE)";
        rel707 [label="rel#707:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, VAR__SCR_EXT_INFO_KRSCSN],fields=CIR_CAPT, FULL_MKT_TSCP, TSCP, FULL_MKT_CIR_CAPT, SCR_INCD, _AST_TYPE, _SCR_TYPE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel766 [label="rel#766:BatchPhysicalTableSourceScan\ntable=[default_catalog, default_database, VAR__SCR_EXT_INFO_KRSCSN],fields=CIR_CAPT, FULL_MKT_TSCP, TSCP, FULL_MKT_CIR_CAPT, SCR_INCD, _AST_TYPE, _SCR_TYPE\nrows=1.0E8, cost={1.0E8 rows, 0.0 cpu, 8.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel794 [label="rel#794:AbstractConverter\ninput=RelSubset#767,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[4]true,sort=[]\nrows=1.0E8, cost={inf}",shape=box]
        rel809 [label="rel#809:BatchPhysicalExchange\ninput=RelSubset#767,distribution=hash[SCR_INCD]\nrows=1.0E8, cost={2.0E8 rows, 1.68E10 cpu, 8.4E9 io, 8.4E9 network, 0.0 memory}",color=blue,shape=box]
        subset751 [label="rel#751:RelSubset#20.LOGICAL.any.[]"]
        subset767 [label="rel#767:RelSubset#20.BATCH_PHYSICAL.any.[]"]
        subset793 [label="rel#793:RelSubset#20.BATCH_PHYSICAL.hash[4]true.[]"]
        subset767 -> subset793;    }
    subgraph cluster21{
        label="Set 21 RecordType(DECIMAL(10, 0) AST_TYPE, DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(19, 0) SCR_INCD, DECIMAL(10, 0) SCR_TYPE, DECIMAL(20, 4) TSCP)";
        rel752 [label="rel#752:FlinkLogicalCalc\ninput=RelSubset#751,select=CASE(IS NULL(_AST_TYPE), 1:DECIMAL(10, 0), CAST(_AST_TYPE)) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CASE(IS NULL(_SCR_TYPE), 1:DECIMAL(10, 0), CAST(_SCR_TYPE)) AS SCR_TYPE, TSCP\nrows=1.0E8, cost={2.0E8 rows, 3.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel768 [label="rel#768:BatchPhysicalCalc\ninput=RelSubset#767,select=CASE(IS NULL(_AST_TYPE), 1:DECIMAL(10, 0), CAST(_AST_TYPE)) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CASE(IS NULL(_SCR_TYPE), 1:DECIMAL(10, 0), CAST(_SCR_TYPE)) AS SCR_TYPE, TSCP\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel774 [label="rel#774:AbstractConverter\ninput=RelSubset#769,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[4]true,sort=[]\nrows=1.0E8, cost={inf}",shape=box]
        rel790 [label="rel#790:BatchPhysicalExchange\ninput=RelSubset#769,distribution=hash[SCR_INCD]\nrows=1.0E8, cost={3.0E8 rows, 1.7E10 cpu, 8.4E9 io, 8.4E9 network, 0.0 memory}",color=blue,shape=box]
        rel795 [label="rel#795:BatchPhysicalCalc\ninput=RelSubset#793,select=CASE(IS NULL(_AST_TYPE), 1:DECIMAL(10, 0), CAST(_AST_TYPE)) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CASE(IS NULL(_SCR_TYPE), 1:DECIMAL(10, 0), CAST(_SCR_TYPE)) AS SCR_TYPE, TSCP\nrows=1.0E8, cost={3.0E8 rows, 1.7E10 cpu, 8.4E9 io, 8.4E9 network, 0.0 memory}",shape=box]
        subset753 [label="rel#753:RelSubset#21.LOGICAL.any.[]"]
        subset769 [label="rel#769:RelSubset#21.BATCH_PHYSICAL.any.[]"]
        subset773 [label="rel#773:RelSubset#21.BATCH_PHYSICAL.hash[4]true.[]"]
        subset769 -> subset773;    }
    subgraph cluster22{
        label="Set 22 RecordType(DECIMAL(4, 0) AST_TYPE, DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(19, 0) SCR_INCD, DECIMAL(8, 0) SCR_TYPE, DECIMAL(20, 4) TSCP)";
        rel710 [label="rel#710:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, TABLE_B],fields=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        subset754 [label="rel#754:RelSubset#22.LOGICAL.any.[]"]
        subset770 [label="rel#770:RelSubset#22.BATCH_PHYSICAL.any.[]",color=red]
        subset799 [label="rel#799:RelSubset#22.BATCH_PHYSICAL.hash[4]true.[]",color=red]
        subset770 -> subset799;    }
    subgraph cluster23{
        label="Set 23 RecordType(DECIMAL(19, 0) SCR_INCD)";
        rel755 [label="rel#755:FlinkLogicalCalc\ninput=RelSubset#754,select=SCR_INCD\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 8.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel771 [label="rel#771:BatchPhysicalCalc\ninput=RelSubset#770,select=SCR_INCD\nrows=1.0E8, cost={inf}",shape=box]
        rel776 [label="rel#776:AbstractConverter\ninput=RelSubset#772,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[0]true,sort=[]\nrows=1.0E8, cost={inf}",shape=box]
        rel783 [label="rel#783:AbstractConverter\ninput=RelSubset#772,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]\nrows=1.0E8, cost={inf}",shape=box]
        rel798 [label="rel#798:AbstractConverter\ninput=RelSubset#775,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=broadcast,sort=[]\nrows=1.0E8, cost={inf}",shape=box]
        rel797 [label="rel#797:BatchPhysicalExchange\ninput=RelSubset#772,distribution=hash[SCR_INCD]\nrows=1.0E8, cost={inf}",shape=box]
        rel800 [label="rel#800:BatchPhysicalCalc\ninput=RelSubset#799,select=SCR_INCD\nrows=1.0E8, cost={inf}",shape=box]
        rel803 [label="rel#803:AbstractConverter\ninput=RelSubset#782,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[0]true,sort=[]\nrows=1.0E8, cost={inf}",shape=box]
        rel802 [label="rel#802:BatchPhysicalExchange\ninput=RelSubset#772,distribution=broadcast\nrows=1.0E8, cost={inf}",shape=box]
        rel805 [label="rel#805:BatchPhysicalExchange\ninput=RelSubset#775,distribution=broadcast\nrows=1.0E8, cost={inf}",shape=box]
        rel807 [label="rel#807:BatchPhysicalExchange\ninput=RelSubset#782,distribution=hash[SCR_INCD]\nrows=1.0E8, cost={inf}",shape=box]
        subset756 [label="rel#756:RelSubset#23.LOGICAL.any.[]"]
        subset772 [label="rel#772:RelSubset#23.BATCH_PHYSICAL.any.[]"]
        subset775 [label="rel#775:RelSubset#23.BATCH_PHYSICAL.hash[0]true.[]"]
        subset782 [label="rel#782:RelSubset#23.BATCH_PHYSICAL.broadcast.[]"]
        subset772 -> subset775;        subset772 -> subset782;    }
    subgraph cluster24{
        label="Set 24 RecordType(DECIMAL(10, 0) AST_TYPE, DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(19, 0) SCR_INCD, DECIMAL(10, 0) SCR_TYPE, DECIMAL(20, 4) TSCP, DECIMAL(19, 0) SCR_INCD0)";
        rel757 [label="rel#757:FlinkLogicalJoin\nleft=RelSubset#753,right=RelSubset#756,condition==($4, $7),joinType=inner\nrows=3.0E7, cost={5.0E8 rows, 6.0E8 cpu, 2.53E10 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel777 [label="rel#777:BatchPhysicalHashJoin\nleft=RelSubset#773,right=RelSubset#775,joinType=InnerJoin,where==(SCR_INCD, SCR_INCD0),select=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP, SCR_INCD0,build=right\nrows=3.0E7, cost={inf}",shape=box]
        rel780 [label="rel#780:BatchPhysicalSortMergeJoin\nleft=RelSubset#773,right=RelSubset#775,joinType=InnerJoin,where==(SCR_INCD, SCR_INCD0),select=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP, SCR_INCD0\nrows=3.0E7, cost={inf}",shape=box]
        rel784 [label="rel#784:BatchPhysicalNestedLoopJoin\nleft=RelSubset#769,right=RelSubset#782,joinType=InnerJoin,where==(SCR_INCD, SCR_INCD0),select=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP, SCR_INCD0,build=right\nrows=3.0E7, cost={inf}",shape=box]
        subset758 [label="rel#758:RelSubset#24.LOGICAL.any.[]"]
        subset778 [label="rel#778:RelSubset#24.BATCH_PHYSICAL.any.[]"]
    }
    subgraph cluster25{
        label="Set 25 RecordType(DECIMAL(4, 0) AST_TYPE, DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(19, 0) SCR_INCD, DECIMAL(8, 0) SCR_TYPE, DECIMAL(20, 4) TSCP)";
        rel759 [label="rel#759:FlinkLogicalCalc\ninput=RelSubset#758,select=CAST(AST_TYPE) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CAST(SCR_TYPE) AS SCR_TYPE, TSCP\nrows=3.0E7, cost={5.3E8 rows, 6.0E8 cpu, 2.53E10 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel786 [label="rel#786:BatchPhysicalCalc\ninput=RelSubset#778,select=CAST(AST_TYPE) AS AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, CAST(SCR_TYPE) AS SCR_TYPE, TSCP\nrows=3.0E7, cost={inf}",shape=box]
        subset760 [label="rel#760:RelSubset#25.LOGICAL.any.[]"]
        subset787 [label="rel#787:RelSubset#25.BATCH_PHYSICAL.any.[]"]
    }
    subgraph cluster26{
        label="Set 26 RecordType(DECIMAL(4, 0) AST_TYPE, DECIMAL(20, 4) CIR_CAPT, DECIMAL(20, 4) FULL_MKT_CIR_CAPT, DECIMAL(20, 4) FULL_MKT_TSCP, DECIMAL(19, 0) SCR_INCD, DECIMAL(8, 0) SCR_TYPE, DECIMAL(20, 4) TSCP)";
        rel762 [label="rel#762:FlinkLogicalSink\ninput=RelSubset#760,table=default_catalog.default_database.TABLE_B,fields=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP\nrows=3.0E7, cost={5.6E8 rows, 6.3E8 cpu, 2.53E10 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
        rel765 [label="rel#765:AbstractConverter\ninput=RelSubset#763,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=3.0E7, cost={inf}",shape=box]
        rel788 [label="rel#788:BatchPhysicalSink\ninput=RelSubset#787,table=default_catalog.default_database.TABLE_B,fields=AST_TYPE, CIR_CAPT, FULL_MKT_CIR_CAPT, FULL_MKT_TSCP, SCR_INCD, SCR_TYPE, TSCP\nrows=3.0E7, cost={inf}",shape=box]
        subset763 [label="rel#763:RelSubset#26.LOGICAL.any.[]"]
        subset764 [label="rel#764:RelSubset#26.BATCH_PHYSICAL.any.[]"]
    }
    root -> subset764;
    subset751 -> rel707[color=blue];
    subset767 -> rel766[color=blue];
    subset793 -> rel794; rel794 -> subset767;
    subset793 -> rel809[color=blue]; rel809 -> subset767[color=blue];
    subset753 -> rel752[color=blue]; rel752 -> subset751[color=blue];
    subset769 -> rel768[color=blue]; rel768 -> subset767[color=blue];
    subset773 -> rel774; rel774 -> subset769;
    subset773 -> rel790[color=blue]; rel790 -> subset769[color=blue];
    subset773 -> rel795; rel795 -> subset793;
    subset754 -> rel710[color=blue];
    subset756 -> rel755[color=blue]; rel755 -> subset754[color=blue];
    subset772 -> rel771; rel771 -> subset770;
    subset775 -> rel776; rel776 -> subset772;
    subset782 -> rel783; rel783 -> subset772;
    subset782 -> rel798; rel798 -> subset775;
    subset775 -> rel797; rel797 -> subset772;
    subset775 -> rel800; rel800 -> subset799;
    subset775 -> rel803; rel803 -> subset782;
    subset782 -> rel802; rel802 -> subset772;
    subset782 -> rel805; rel805 -> subset775;
    subset775 -> rel807; rel807 -> subset782;
    subset758 -> rel757[color=blue]; rel757 -> subset753[color=blue,label="0"]; rel757 -> subset756[color=blue,label="1"];
    subset778 -> rel777; rel777 -> subset773[label="0"]; rel777 -> subset775[label="1"];
    subset778 -> rel780; rel780 -> subset773[label="0"]; rel780 -> subset775[label="1"];
    subset778 -> rel784; rel784 -> subset769[label="0"]; rel784 -> subset782[label="1"];
    subset760 -> rel759[color=blue]; rel759 -> subset758[color=blue];
    subset787 -> rel786; rel786 -> subset778;
    subset763 -> rel762[color=blue]; rel762 -> subset760[color=blue];
    subset764 -> rel765; rel765 -> subset763;
    subset764 -> rel788; rel788 -> subset787;
}
    at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:742)
    at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:365)
    at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:520)
    at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
    ... 28 more

{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)