You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "徐州州 (Jira)" <ji...@apache.org> on 2021/06/01 08:49:00 UTC

[jira] [Created] (FLINK-22826) link sql1.13.1基于change log流数据join导致数据丢失

徐州州 created FLINK-22826:
---------------------------

             Summary: link sql1.13.1基于change log流数据join导致数据丢失
                 Key: FLINK-22826
                 URL: https://issues.apache.org/jira/browse/FLINK-22826
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.13.1, 1.12.0
            Reporter: 徐州州


{code:java}
insert into dwd_order_detail
select
   ord.Id,
   ord.Code,
   Status
     concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd'))  as uuids,
     TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd')) as As_Of_Date
from
orders ord
left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
) and ord.IsDeleted=0;
{code}
My upsert-kafka table for PRIMARY KEY for uuids.

This is the logic of my kafka based canal-json stream data join and write to Upsert-kafka tables I confirm that version 1.12 also has this problem I just upgraded from 1.12 to 1.13.

I look up a user s order data and order number XJ0120210531004794 in canal-json original table as U which is normal.
{code:java}
| +U |             XJ0120210531004794 |          50 |
| +U |             XJ0120210531004672 |          50 |
{code}
But written to upsert-kakfa via join, the data consumed from upsert kafka is,
{code:java}
| +I |             XJ0120210531004794 |          50 |
| -U |             XJ0120210531004794 |          50 |
{code}
The order is two records this sheet in orders and order_extend tables has not changed since created -U status caused my data loss not computed and the final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)