You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 徐州州 <25...@qq.com> on 2021/06/01 08:23:46 UTC

flink sql1.13.1基于change log流数据join导致数据丢失,非常严重!!!

项目中正在使用的逻辑|insert into dwd_order_detail
|select
|   ord.Id,
|   ord.Code,
|   Status
|     concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd'))  as uuids,
|     TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd')) as As_Of_Date
|from
|orders ord
|left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and oed.CreateTime&gt;CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
|where ( ord.OrderTime&gt;CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
|or ord.ReviewTime&gt;CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
|or ord.RejectTime&gt;CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
|) and ord.IsDeleted=0;这是我基于kafka的canal-json流数据join并写入upsert-kafka表的逻辑,我确认1.12版本也存在这个问题,我刚从1.12升级到1.13。我查找一个用户的下单数据,订单号XJ0120210531004794在canal-json原表中显示为+U这是正常的。| +U |             XJ0120210531004704 |          50 | | +U |             XJ0120210531004788 |          50 | | +U |             XJ0120210531004819 |          50 | | +U |             XJ0120210531004667 |          50 | | +U |             XJ0120210531004695 |          50 | | +U |             XJ0120210531004776 |          50 | | +U |             XJ0120210531004784 |          50 | | +U |             XJ0120210531004861 |          50 | | +U |             XJ0120210531004794 |          50 | | +U |             XJ0120210531004672 |          50 | | +U |             XJ0120210531004766 |          50 | | +U |             XJ0120210531004806 |          50 | | +U |             XJ0120210531004812 |          50 | | +U |             XJ0120210601000126 |          50 | | +U |             XJ0120210601000179 |          50 | | +U |             XJ0120210531004816 |          50 | | +U |             XJ0120210601000927 |          50 |但通过join写入upsert kakfa中,从upsert kafka中消费出来的数据为,| +I |             XJ0120210531004794 |          50 | | -U |             XJ0120210531004794 |          50 |该订单为两条记录,这条单子在orders和order_extend表自从创建未发生过任何变更,-U的状态导致我数据丢失未计算,希望各位大神能解惑。