You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zoucao (Jira)" <ji...@apache.org> on 2022/06/08 08:45:00 UTC

[jira] [Created] (FLINK-27953) using the original order to add the primary key in PushProjectIntoTableSourceScanRule

zoucao created FLINK-27953:
------------------------------

             Summary: using the original order to add the primary key in PushProjectIntoTableSourceScanRule
                 Key: FLINK-27953
                 URL: https://issues.apache.org/jira/browse/FLINK-27953
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
    Affects Versions: 1.14.4
            Reporter: zoucao


In PushProjectIntoTableSourceScanRule, if the source produces a changelog stream, the primary key will be added to the end of projected fields, see the following SQL:
{code:java}
StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
        TableEnvironment tEnv = util.getTableEnv();
        String srcTableDdl =
                "CREATE TABLE fs (\n"
                        + "  a bigint,\n"
                        + "  b int,\n"
                        + "  c varchar,\n"
                        + "  d int,\n"
                        + "  e int,\n "
                        + "  primary key (a,b) not enforced \n"
                        + ") with (\n"
                        + " 'connector' = 'values',\n"
                        + " 'disable-lookup'='true',\n"
                        + " 'changelog-mode' = 'I,UB,UA,D')";
        tEnv.executeSql(srcTableDdl);
        tEnv.getConfig().set("table.exec.source.cdc-events-duplicate", "true");
{code}
{code:java}
 System.out.println(tEnv.explainSql("select a, c from fs where c > 0 and b = 0"));

projected list:
[[0],[1],[2]]

== Optimized Execution Plan ==
Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)])
+- ChangelogNormalize(key=[a, b])
   +- Exchange(distribution=[hash[a, b]])
      +- Calc(select=[a, b, c], where=[(b = 0)])
         +- DropUpdateBefore
            +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c])
{code}
{code:java}
 System.out.println(tEnv.explainSql("select a, c from fs where c > 0")); 

projected list:
[[0],[2],[1]]

 == Optimized Execution Plan ==
Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)])
+- ChangelogNormalize(key=[a, b])
   +- Exchange(distribution=[hash[a, b]])
      +- DropUpdateBefore
         +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[], project=[a, c, b], metadata=[]]], fields=[a, c, b])
{code}
Field b is not involved in
{code:sql}
select a, c from fs where c > 0{code}
, but it is a primary key, so we add it to the end of projected list, If 'table.exec.source.cdc-events-duplicate' is enabled. The condition about field b will change output type, that says the duplicate node will get the different input type, and the state serializer will also be changed, leading to state incompatibility.

I think we can use the original order from the source table to add the primary key to projected list.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)