You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zoucao (Jira)" <ji...@apache.org> on 2022/06/16 03:58:00 UTC
[jira] [Comment Edited] (FLINK-27953) using the original order to add the primary key in PushProjectIntoTableSourceScanRule
[ https://issues.apache.org/jira/browse/FLINK-27953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554894#comment-17554894 ]
zoucao edited comment on FLINK-27953 at 6/16/22 3:57 AM:
---------------------------------------------------------
Hi [~godfreyhe], could you have time to take a look?
was (Author: zoucao):
Hi [~godfrey], could you have time to take a look?
> using the original order to add the primary key in PushProjectIntoTableSourceScanRule
> -------------------------------------------------------------------------------------
>
> Key: FLINK-27953
> URL: https://issues.apache.org/jira/browse/FLINK-27953
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.14.4
> Reporter: zoucao
> Priority: Major
>
> In PushProjectIntoTableSourceScanRule, if the source produces a changelog stream, the primary key will be added to the end of projected fields, see the following SQL:
> {code:java}
> StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
> TableEnvironment tEnv = util.getTableEnv();
> String srcTableDdl =
> "CREATE TABLE fs (\n"
> + " a bigint,\n"
> + " b int,\n"
> + " c varchar,\n"
> + " d int,\n"
> + " e int,\n "
> + " primary key (a,b) not enforced \n"
> + ") with (\n"
> + " 'connector' = 'values',\n"
> + " 'disable-lookup'='true',\n"
> + " 'changelog-mode' = 'I,UB,UA,D')";
> tEnv.executeSql(srcTableDdl);
> tEnv.getConfig().set("table.exec.source.cdc-events-duplicate", "true");
> {code}
> {code:java}
> System.out.println(tEnv.explainSql("select a, c from fs where c > 0 and b = 0"));
> projected list:
> [[0],[1],[2]]
> == Optimized Execution Plan ==
> Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)])
> +- ChangelogNormalize(key=[a, b])
> +- Exchange(distribution=[hash[a, b]])
> +- Calc(select=[a, b, c], where=[(b = 0)])
> +- DropUpdateBefore
> +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c])
> {code}
> {code:java}
> System.out.println(tEnv.explainSql("select a, c from fs where c > 0"));
> projected list:
> [[0],[2],[1]]
> == Optimized Execution Plan ==
> Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)])
> +- ChangelogNormalize(key=[a, b])
> +- Exchange(distribution=[hash[a, b]])
> +- DropUpdateBefore
> +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[], project=[a, c, b], metadata=[]]], fields=[a, c, b])
> {code}
> Field b is not involved in
> {code:sql}
> select a, c from fs where c > 0{code}
> , but it is a primary key, so we add it to the end of projected list, If 'table.exec.source.cdc-events-duplicate' is enabled. The condition about field b will change output type, that says the duplicate node will get the different input type, and the state serializer will also be changed, leading to state incompatibility.
> I think we can use the original order from the source table to add the primary key to projected list.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)