You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zoucao (Jira)" <ji...@apache.org> on 2022/06/08 08:45:00 UTC
[jira] [Created] (FLINK-27953) using the original order to add the primary key in PushProjectIntoTableSourceScanRule
zoucao created FLINK-27953:
------------------------------
Summary: using the original order to add the primary key in PushProjectIntoTableSourceScanRule
Key: FLINK-27953
URL: https://issues.apache.org/jira/browse/FLINK-27953
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Affects Versions: 1.14.4
Reporter: zoucao
In PushProjectIntoTableSourceScanRule, if the source produces a changelog stream, the primary key will be added to the end of projected fields, see the following SQL:
{code:java}
StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
TableEnvironment tEnv = util.getTableEnv();
String srcTableDdl =
"CREATE TABLE fs (\n"
+ " a bigint,\n"
+ " b int,\n"
+ " c varchar,\n"
+ " d int,\n"
+ " e int,\n "
+ " primary key (a,b) not enforced \n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'disable-lookup'='true',\n"
+ " 'changelog-mode' = 'I,UB,UA,D')";
tEnv.executeSql(srcTableDdl);
tEnv.getConfig().set("table.exec.source.cdc-events-duplicate", "true");
{code}
{code:java}
System.out.println(tEnv.explainSql("select a, c from fs where c > 0 and b = 0"));
projected list:
[[0],[1],[2]]
== Optimized Execution Plan ==
Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)])
+- ChangelogNormalize(key=[a, b])
+- Exchange(distribution=[hash[a, b]])
+- Calc(select=[a, b, c], where=[(b = 0)])
+- DropUpdateBefore
+- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c])
{code}
{code:java}
System.out.println(tEnv.explainSql("select a, c from fs where c > 0"));
projected list:
[[0],[2],[1]]
== Optimized Execution Plan ==
Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)])
+- ChangelogNormalize(key=[a, b])
+- Exchange(distribution=[hash[a, b]])
+- DropUpdateBefore
+- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[], project=[a, c, b], metadata=[]]], fields=[a, c, b])
{code}
Field b is not involved in
{code:sql}
select a, c from fs where c > 0{code}
, but it is a primary key, so we add it to the end of projected list, If 'table.exec.source.cdc-events-duplicate' is enabled. The condition about field b will change output type, that says the duplicate node will get the different input type, and the state serializer will also be changed, leading to state incompatibility.
I think we can use the original order from the source table to add the primary key to projected list.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)