You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 赵旭晨 <jj...@163.com> on 2022/03/15 04:25:18 UTC
flinksql 使用kafka connector (自定义的cdc格式) 消费删除语义的数据没有纳入计算
flink版本:1.14.3 场景如下:
sql:
set table.exec.state.ttl=1 day;
describe t_k_chargeorder;
describe t_k_appointment;
SELECT
ReportTime,
sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount ) kpitotalcount,
sum( InsertActualPriceCount ) InsertActualPriceCount,
sum( InsertAppointmentCount ) InsertAppointmentCount,
sum( InsertChargeOrderCount ) InsertChargeOrderCount,
now() LastUpdatedDT
from
(
SELECT
DATE_FORMAT( recordcreatedtime, 'yyyy-MM-dd' ) ReportTime,
sum( actualprice ) InsertActualPriceCount,
0 InsertShortMessageCount,
0 InsertAppointmentCount,
0 InsertImageCount,
0 InsertChargeOrderCount,
0 InsertPerioExamCount,
0 InsertMedicalCount,
0 InsertPatientCount,
0 InsertGeneralExamCount,
0 InsertFollowupCount
FROM
--effective_chargeorder t
(SELECT
o.recordcreatedtime,
o.recordcreateduser,
o.status,
o._is_delete,
o.appointmentid,
o.id,
o.tenantid,
o.actualprice,
o.proc_time,
t.Name,
t.IsInactive
FROM
t_k_chargeorder AS o
INNER JOIN t_dental_tenant FOR SYSTEM_TIME AS OF o.proc_time AS t ON o.tenantid = t.Id
WHERE
t.IsInactive = '0'
AND o.recordcreateduser > 0
AND o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' )
AND o._is_delete = '0'
AND o.appointmentid > 0) t
WHERE
recordcreatedtime BETWEEN concat( DATE_FORMAT( now() , 'yyyy-MM-dd' ), ' 00:00:00' )
AND now()
GROUP BY
DATE_FORMAT( recordcreatedtime, 'yyyy-MM-dd' )
) a
group by ReportTime;
DAG图如下:
业务库的新增、修改操作都能监听到,并给出正确结果。
但只要是删除语义,kafka的cdc format能消费到删除数据
但sql计算结果却没有作相应的扣减,如下:
删除后应该由150---->100,但什么也没有发生,感觉是内部算子把这条-D给过滤了
恳请大佬解惑~~
Re: flinksql 使用kafka connector (自定义的cdc格式) 消费删除语义的数据没有纳入计算
Posted by Guo Thompson <gw...@gmail.com>.
看不到图
赵旭晨 <jj...@163.com> 于2022年3月15日周二 12:25写道:
> flink版本:1.14.3 场景如下:
> sql:
> set table.exec.state.ttl=1 day;
> describe t_k_chargeorder;
> describe t_k_appointment;
> SELECT
> ReportTime,
> sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount )
> kpitotalcount,
> sum( InsertActualPriceCount ) InsertActualPriceCount,
> sum( InsertAppointmentCount ) InsertAppointmentCount,
> sum( InsertChargeOrderCount ) InsertChargeOrderCount,
> now() LastUpdatedDT
> from
> (
> SELECT
> DATE_FORMAT( recordcreatedtime, 'yyyy-MM-dd' ) ReportTime,
> sum( actualprice ) InsertActualPriceCount,
> 0 InsertShortMessageCount,
> 0 InsertAppointmentCount,
> 0 InsertImageCount,
> 0 InsertChargeOrderCount,
> 0 InsertPerioExamCount,
> 0 InsertMedicalCount,
> 0 InsertPatientCount,
> 0 InsertGeneralExamCount,
> 0 InsertFollowupCount
> FROM
> --effective_chargeorder t
> (SELECT
> o.recordcreatedtime,
> o.recordcreateduser,
> o.status,
> o._is_delete,
> o.appointmentid,
> o.id,
> o.tenantid,
> o.actualprice,
> o.proc_time,
> t.Name,
> t.IsInactive
> FROM
> t_k_chargeorder AS o
> INNER JOIN t_dental_tenant FOR SYSTEM_TIME AS OF o.proc_time AS t
> ON o.tenantid = t.Id
> WHERE
> t.IsInactive = '0'
> AND o.recordcreateduser > 0
> AND o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' )
> AND o._is_delete = '0'
> AND o.appointmentid > 0) t
> WHERE
> recordcreatedtime BETWEEN concat( DATE_FORMAT( now() , 'yyyy-MM-dd' ),
> ' 00:00:00' )
> AND now()
> GROUP BY
> DATE_FORMAT( recordcreatedtime, 'yyyy-MM-dd' )
> ) a
> group by ReportTime;
>
> DAG图如下:
> 业务库的新增、修改操作都能监听到,并给出正确结果。
> 但只要是删除语义,kafka的cdc format能消费到删除数据
>
> 但sql计算结果却没有作相应的扣减,如下:
> 删除后应该由150---->100,但什么也没有发生,感觉是内部算子把这条-D给过滤了
> 恳请大佬解惑~~
>
>
>
>
>
>
>