You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lincoln lee (Jira)" <ji...@apache.org> on 2024/03/18 05:34:00 UTC

[jira] [Commented] (FLINK-34702) Rank should not convert to StreamExecDuplicate when the input is not insert only

    [ https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827843#comment-17827843 ] 

lincoln lee commented on FLINK-34702:
-------------------------------------

[~jackylau] Thank you for reporting this! 
This should be an improvement but not a bug. Currently, some operators do not support processing changelog input, and `StreamPhysicalDeduplicate` is one of them.
For the solution, can we consider taking a step forward, allowing `StreamPhysicalDeduplicate` to support changelog input (let the physical operator support this input, just like we added support changelog input for Window TVF Aggregation in 1.19), instead of patching with a switch during the optimization phase to bypass it, WDYT?

> Rank should not convert to StreamExecDuplicate when the input is not insert only
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-34702
>                 URL: https://issues.apache.org/jira/browse/FLINK-34702
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0
>            Reporter: Jacky Lau
>            Priority: Major
>             Fix For: 1.20.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
>   val sqlQuery =
>     """
>       |SELECT *
>       |FROM (
>       |  SELECT *,
>       |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
>       |  FROM (select a, count(b) as b from MyTable group by a)
>       |)
>       |WHERE rowNum = 1
>     """.stripMargin
>   util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in this case. and we can defer whether input contains update change in the "optimize the physical plan" phase. 
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate can support consuming update changes , we can deprecate it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)