You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2022/04/13 06:28:05 UTC
[jira] [Updated] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Gao updated FLINK-22826:
----------------------------
Fix Version/s: 1.16.0
> flink sql1.13.1 causes data loss based on change log stream data join
> ---------------------------------------------------------------------
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.12.0, 1.13.1
> Reporter: 徐州州
> Priority: Minor
> Labels: auto-deprioritized-major, stale-blocker
> Fix For: 1.15.0, 1.16.0
>
>
> {code:java}
> insert into dwd_order_detail
> select
> ord.Id,
> ord.Code,
> Status
> concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd')) as uuids,
> TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to Upsert-kafka tables I confirm that version 1.12 also has this problem I just upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 | 50 |
> | +U | XJ0120210531004672 | 50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 | 50 |
> | -U | XJ0120210531004794 | 50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not changed since created -U status caused my data loss not computed and the final result was wrong.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)