You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zoucao (Jira)" <ji...@apache.org> on 2022/06/16 03:58:00 UTC

[jira] [Comment Edited] (FLINK-27953) using the original order to add the primary key in PushProjectIntoTableSourceScanRule

    [ https://issues.apache.org/jira/browse/FLINK-27953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554894#comment-17554894 ] 

zoucao edited comment on FLINK-27953 at 6/16/22 3:57 AM:
---------------------------------------------------------

Hi [~godfreyhe], could you have time to take a look?


was (Author: zoucao):
Hi [~godfrey], could you have time to take a look?

> using the original order to add the primary key in PushProjectIntoTableSourceScanRule
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-27953
>                 URL: https://issues.apache.org/jira/browse/FLINK-27953
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.14.4
>            Reporter: zoucao
>            Priority: Major
>
> In PushProjectIntoTableSourceScanRule, if the source produces a changelog stream, the primary key will be added to the end of projected fields, see the following SQL:
> {code:java}
> StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
>         TableEnvironment tEnv = util.getTableEnv();
>         String srcTableDdl =
>                 "CREATE TABLE fs (\n"
>                         + "  a bigint,\n"
>                         + "  b int,\n"
>                         + "  c varchar,\n"
>                         + "  d int,\n"
>                         + "  e int,\n "
>                         + "  primary key (a,b) not enforced \n"
>                         + ") with (\n"
>                         + " 'connector' = 'values',\n"
>                         + " 'disable-lookup'='true',\n"
>                         + " 'changelog-mode' = 'I,UB,UA,D')";
>         tEnv.executeSql(srcTableDdl);
>         tEnv.getConfig().set("table.exec.source.cdc-events-duplicate", "true");
> {code}
> {code:java}
>  System.out.println(tEnv.explainSql("select a, c from fs where c > 0 and b = 0"));
> projected list:
> [[0],[1],[2]]
> == Optimized Execution Plan ==
> Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)])
> +- ChangelogNormalize(key=[a, b])
>    +- Exchange(distribution=[hash[a, b]])
>       +- Calc(select=[a, b, c], where=[(b = 0)])
>          +- DropUpdateBefore
>             +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c])
> {code}
> {code:java}
>  System.out.println(tEnv.explainSql("select a, c from fs where c > 0")); 
> projected list:
> [[0],[2],[1]]
>  == Optimized Execution Plan ==
> Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)])
> +- ChangelogNormalize(key=[a, b])
>    +- Exchange(distribution=[hash[a, b]])
>       +- DropUpdateBefore
>          +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[], project=[a, c, b], metadata=[]]], fields=[a, c, b])
> {code}
> Field b is not involved in
> {code:sql}
> select a, c from fs where c > 0{code}
> , but it is a primary key, so we add it to the end of projected list, If 'table.exec.source.cdc-events-duplicate' is enabled. The condition about field b will change output type, that says the duplicate node will get the different input type, and the state serializer will also be changed, leading to state incompatibility.
> I think we can use the original order from the source table to add the primary key to projected list.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)