You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 徐州州 <25...@qq.com> on 2021/06/01 08:23:46 UTC
flink sql1.13.1基于change log流数据join导致数据丢失,非常严重!!!
项目中正在使用的逻辑|insert into dwd_order_detail
|select
| ord.Id,
| ord.Code,
| Status
| concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd')) as uuids,
| TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd')) as As_Of_Date
|from
|orders ord
|left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
|where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
|or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
|or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
|) and ord.IsDeleted=0;这是我基于kafka的canal-json流数据join并写入upsert-kafka表的逻辑,我确认1.12版本也存在这个问题,我刚从1.12升级到1.13。我查找一个用户的下单数据,订单号XJ0120210531004794在canal-json原表中显示为+U这是正常的。| +U | XJ0120210531004704 | 50 | | +U | XJ0120210531004788 | 50 | | +U | XJ0120210531004819 | 50 | | +U | XJ0120210531004667 | 50 | | +U | XJ0120210531004695 | 50 | | +U | XJ0120210531004776 | 50 | | +U | XJ0120210531004784 | 50 | | +U | XJ0120210531004861 | 50 | | +U | XJ0120210531004794 | 50 | | +U | XJ0120210531004672 | 50 | | +U | XJ0120210531004766 | 50 | | +U | XJ0120210531004806 | 50 | | +U | XJ0120210531004812 | 50 | | +U | XJ0120210601000126 | 50 | | +U | XJ0120210601000179 | 50 | | +U | XJ0120210531004816 | 50 | | +U | XJ0120210601000927 | 50 |但通过join写入upsert kakfa中,从upsert kafka中消费出来的数据为,| +I | XJ0120210531004794 | 50 | | -U | XJ0120210531004794 | 50 |该订单为两条记录,这条单子在orders和order_extend表自从创建未发生过任何变更,-U的状态导致我数据丢失未计算,希望各位大神能解惑。