You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 赵旭晨 <jj...@163.com> on 2022/03/01 07:14:02 UTC

使用CUMULATE WINDOW 消费upsertkafka遇到的问题

sql如下:
with effective_chargeorder as (
select o.recordcreatedtime,o.recordcreateduser,o.status,o._is_delete,o.appointmentid,o.id,o.tenantid,o.actualprice,o.proc_time from t_k_chargeorder as o
where o.recordcreateduser > 0 and o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' ) 
and o._is_delete = '0' and o.appointmentid > 0
)
--select * from effective_chargeorder;  
SELECT window_start, window_end, SUM(actualprice)
  FROM TABLE(
    CUMULATE(TABLE effective_chargeorder, DESCRIPTOR(proc_time), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;


IDE报如下错误:


消费的是upsertkafka的source,由于对planner这层了解不深,不是很看的懂,请各位大佬指点~






Re: flinksql 使用kafka connector (自定义的cdc格式) 消费删除语义的数据没有纳入计算

Posted by Guo Thompson <gw...@gmail.com>.
看不到图

赵旭晨 <jj...@163.com> 于2022年3月15日周二 12:25写道:

> flink版本:1.14.3   场景如下:
> sql:
> set table.exec.state.ttl=1 day;
> describe t_k_chargeorder;
> describe t_k_appointment;
> SELECT
> ReportTime,
> sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount )
> kpitotalcount,
> sum( InsertActualPriceCount ) InsertActualPriceCount,
> sum( InsertAppointmentCount ) InsertAppointmentCount,
> sum( InsertChargeOrderCount ) InsertChargeOrderCount,
> now() LastUpdatedDT
> from
> (
> SELECT
>     DATE_FORMAT( recordcreatedtime, 'yyyy-MM-dd' ) ReportTime,
>     sum( actualprice ) InsertActualPriceCount,
>     0 InsertShortMessageCount,
>     0 InsertAppointmentCount,
>     0 InsertImageCount,
>     0 InsertChargeOrderCount,
>     0 InsertPerioExamCount,
>     0 InsertMedicalCount,
>     0 InsertPatientCount,
>     0 InsertGeneralExamCount,
>     0 InsertFollowupCount
> FROM
>     --effective_chargeorder t
>     (SELECT
>         o.recordcreatedtime,
>         o.recordcreateduser,
>         o.status,
>         o._is_delete,
>         o.appointmentid,
>         o.id,
>         o.tenantid,
>         o.actualprice,
>         o.proc_time,
>         t.Name,
>         t.IsInactive
>     FROM
>         t_k_chargeorder AS o
>         INNER JOIN t_dental_tenant FOR SYSTEM_TIME AS OF o.proc_time AS t
> ON o.tenantid = t.Id
>     WHERE
>         t.IsInactive = '0'
>         AND o.recordcreateduser > 0
>         AND o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' )
>         AND o._is_delete = '0'
>         AND o.appointmentid > 0) t
> WHERE
>     recordcreatedtime BETWEEN concat( DATE_FORMAT( now() , 'yyyy-MM-dd' ),
> ' 00:00:00' )
>     AND now()
> GROUP BY
>     DATE_FORMAT( recordcreatedtime, 'yyyy-MM-dd' )
> ) a
> group by ReportTime;
>
> DAG图如下:
> 业务库的新增、修改操作都能监听到,并给出正确结果。
> 但只要是删除语义,kafka的cdc format能消费到删除数据
>
> 但sql计算结果却没有作相应的扣减,如下:
> 删除后应该由150---->100,但什么也没有发生,感觉是内部算子把这条-D给过滤了
> 恳请大佬解惑~~
>
>
>
>
>
>
>

flinksql 使用kafka connector (自定义的cdc格式) 消费删除语义的数据没有纳入计算

Posted by 赵旭晨 <jj...@163.com>.
flink版本:1.14.3   场景如下:
sql:
set table.exec.state.ttl=1 day;
describe t_k_chargeorder;
describe t_k_appointment;
SELECT
ReportTime,
sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount )  kpitotalcount,
sum( InsertActualPriceCount ) InsertActualPriceCount,
sum( InsertAppointmentCount ) InsertAppointmentCount,
sum( InsertChargeOrderCount ) InsertChargeOrderCount,
now() LastUpdatedDT 
from 
(
SELECT
    DATE_FORMAT( recordcreatedtime, 'yyyy-MM-dd' ) ReportTime,
    sum( actualprice ) InsertActualPriceCount,
    0 InsertShortMessageCount,
    0 InsertAppointmentCount,
    0 InsertImageCount,
    0 InsertChargeOrderCount,
    0 InsertPerioExamCount,
    0 InsertMedicalCount,
    0 InsertPatientCount,
    0 InsertGeneralExamCount,
    0 InsertFollowupCount 
FROM
    --effective_chargeorder t 
    (SELECT
        o.recordcreatedtime,
        o.recordcreateduser,
        o.status,
        o._is_delete,
        o.appointmentid,
        o.id,
        o.tenantid,
        o.actualprice,
        o.proc_time,
        t.Name,
        t.IsInactive 
    FROM
        t_k_chargeorder AS o
        INNER JOIN t_dental_tenant FOR SYSTEM_TIME AS OF o.proc_time AS t ON o.tenantid = t.Id 
    WHERE
        t.IsInactive = '0' 
        AND o.recordcreateduser > 0 
        AND o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' ) 
        AND o._is_delete = '0' 
        AND o.appointmentid > 0) t
WHERE
    recordcreatedtime BETWEEN concat( DATE_FORMAT( now() , 'yyyy-MM-dd' ), ' 00:00:00' ) 
    AND now() 
GROUP BY
    DATE_FORMAT( recordcreatedtime, 'yyyy-MM-dd' )  
) a
group by ReportTime;


DAG图如下:
业务库的新增、修改操作都能监听到,并给出正确结果。
但只要是删除语义,kafka的cdc format能消费到删除数据


但sql计算结果却没有作相应的扣减,如下:
删除后应该由150---->100,但什么也没有发生,感觉是内部算子把这条-D给过滤了
恳请大佬解惑~~






Re: 使用CUMULATE WINDOW 消费upsertkafka遇到的问题

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

window tvf 目前不支持消费 changelog,也就是说只能消费 insert 数据。upsert-kafka source 是一个会产生
changelog 的 source,因此下游不能接 window tvf。

赵旭晨 <jj...@163.com> 于2022年3月1日周二 15:23写道:

> sql如下:
> with effective_chargeorder as (
> select
> o.recordcreatedtime,o.recordcreateduser,o.status,o._is_delete,o.appointmentid,
> o.id,o.tenantid,o.actualprice,o.proc_time from t_k_chargeorder as o
> where o.recordcreateduser > 0 and o.status NOT IN ( '已作废', '未收费', '作废并撤回',
> '等待支付宝付费', '等待微信付费' )
> and o._is_delete = '0' and o.appointmentid > 0
> )
> --select * from effective_chargeorder;
> SELECT window_start, window_end, SUM(actualprice)
>   FROM TABLE(
>     CUMULATE(TABLE effective_chargeorder, DESCRIPTOR(proc_time), INTERVAL
> '2' MINUTES, INTERVAL '10' MINUTES))
>   GROUP BY window_start, window_end;
>
> IDE报如下错误:
> StreamPhysicalWindowAggregate doesn't support consuming update and delete
> changes which is produced by node ChangelogNormalize(key=[id])
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:394)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:310)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:353)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:342)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:341)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
> 消费的是upsertkafka的source,由于对planner这层了解不深,不是很看的懂,请各位大佬指点~
>
>
>
>
>
>
>
>
>
>
>

Re:使用CUMULATE WINDOW 消费upsertkafka遇到的问题

Posted by 赵旭晨 <jj...@163.com>.
sql如下:
with effective_chargeorder as (
select o.recordcreatedtime,o.recordcreateduser,o.status,o._is_delete,o.appointmentid,o.id,o.tenantid,o.actualprice,o.proc_time from t_k_chargeorder as o
where o.recordcreateduser > 0 and o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' ) 
and o._is_delete = '0' and o.appointmentid > 0
)
--select * from effective_chargeorder;  
SELECT window_start, window_end, SUM(actualprice)
  FROM TABLE(
    CUMULATE(TABLE effective_chargeorder, DESCRIPTOR(proc_time), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;


IDE报如下错误:
StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node ChangelogNormalize(key=[id])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:394)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:310)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:353)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:342)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)


消费的是upsertkafka的source,由于对planner这层了解不深,不是很看的懂,请各位大佬指点~