You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Alexander Smirnov (Jira)" <ji...@apache.org> on 2023/04/06 07:03:00 UTC

[jira] [Comment Edited] (FLINK-31729) Unexpected UPDATE_BEFORE output record in LEFT OUTER JOIN

    [ https://issues.apache.org/jira/browse/FLINK-31729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17709232#comment-17709232 ] 

Alexander Smirnov edited comment on FLINK-31729 at 4/6/23 7:02 AM:
-------------------------------------------------------------------

[~luoyuxia] Yes, according to the expected logic it should be always -D, +I, -D, +I. Here is the current JavaDoc above StreamingJoinOperator#processElement, which states that RowKing is being forwarded only in INNER JOIN. 
!image-2023-04-06-14-02-30-826.png!

However, I decided to implement ticket https://issues.apache.org/jira/browse/FLINK-17337 right away (emit -U/+U not only in inner join), because with new logic bug won't be present anymore. Here is a PR - [https://github.com/apache/flink/pull/22351]


was (Author: JIRAUSER288574):
[~luoyuxia] Yes, according to the expected logic it should be always -D, +I, -D, +I. Here is the current JavaDoc above StreamingJoinOperator#processElement, which states that RowKing is being forwarded only in INNER JOIN. !image-2023-04-06-13-56-21-971.png!

However, I decided to implement ticket https://issues.apache.org/jira/browse/FLINK-17337 right away (emit -U/+U not only in inner join), because with new logic bug won't be present anymore. Here is a PR - [https://github.com/apache/flink/pull/22351]

> Unexpected UPDATE_BEFORE output record in LEFT OUTER JOIN
> ---------------------------------------------------------
>
>                 Key: FLINK-31729
>                 URL: https://issues.apache.org/jira/browse/FLINK-31729
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.17.0
>            Reporter: Alexander Smirnov
>            Priority: Minor
>             Fix For: 1.18.0
>
>         Attachments: image-2023-04-05-00-08-32-984.png, image-2023-04-06-13-55-42-242.png, image-2023-04-06-13-56-08-998.png, image-2023-04-06-13-56-21-971.png, image-2023-04-06-14-01-07-871.png, image-2023-04-06-14-02-30-826.png
>
>
> Currently, in streaming LEFT/RIGHT/FULL OUTER JOIN Flink SQL doesn't emit UPDATE_BEFORE/UPDATE_AFTER records, but instead explicitly change RowKind of output records to INSERT/DELETE for simplicity. However, it doesn't work as expected, because sometimes UPDATE_BEFORE rows can be emitted. What is more confusing - after UPDATE_BEFORE record there will be INSERT record (not UPDATE_AFTER), which can cause bugs in case when downstream operators process UPDATE records in a different way than INSERT/DELETE (for example, it can assume, that after UPDATE_BEFORE there should be UPDATE_AFTER record at some point of time).
> How to reproduce:
> Suppose we have tables "source1" and "source2":
> CREATE TABLE source1(
>   id int PRIMARY KEY,
>   c3 bigint
> ) WITH (
>   'connector' = 'kafka',
>    ...
>   'format' = 'debezium-json'
> );
>  
> CREATE TABLE source2(
>   id int PRIMARY KEY,
>   c3 bigint
> ) WITH (
>   'connector' = 'kafka',
>    ...
>   'format' = 'debezium-json'
> );
> And we execute the following query:
> "select  t1.id, t1.c3,t2.id, t2.c3 from source1 t1 left join source2 t2 on t1.id = t2.id"
> Then we insert records one by one:
> source1: 
> {noformat}
> {"before":null,"after":{"id":2,"c3":7121},"op":"c"}{noformat}
> source2: 
> {noformat}
> {"before":null,"after":{"id":2,"c3":364},"op":"c"}{noformat}
> source1: 
> {noformat}
> {"before":{"id":2,"c3":7121},"after":{"id":2,"c3":7222},"op":"u"}{noformat}
> source2: 
> {noformat}
> {"before":{"id":2,"c3":364},"after":{"id":2,"c3":564},"op":"u"}{noformat}
> The result will be as in the following screenshot:
> !image-2023-04-05-00-08-32-984.png!
> Note, that after implementing ticket https://issues.apache.org/jira/browse/FLINK-17337 (support emitting UPDATE_BEFORE/UPDATE_AFTER records not only in inner join) the described bug won't be relevant anymore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)