You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小昌同学 <cc...@163.com> on 2023/03/28 00:48:49 UTC

flink sql upsert mysql问题

你好,我这边使用flink  sql实现四条流的关联,后续实现case when的逻辑,并且将数据插入到mysql,但是从结果数据来看,数据存在部分丢失,代码我粘贴再后面,麻烦各位老师指导,下面是sql【create function get_json_value as 'com.nesc.flink.udf.GetJsonValue';
set 'table.exec.sink.not-null-enforcer'='drop';
----测试环境
CREATE TABLE dm_cust_oact_prog_ri (
     cust_id STRING COMMENT '客户id'
    ,cust_nme STRING COMMENT '客户姓名'
    ,cust_mob_tel STRING COMMENT '客户手机号'
    ,cust_curr_step STRING COMMENT '客户当前步骤'
    ,cust_curr_step_num INT COMMENT '客户当前步骤数字'
    ,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间'
    ,user_id STRING COMMENT '开户时使用的user_id'
    ,tech_sys_time STRING COMMENT '技术字段,更新时间'
    ,primary key (user_id,cust_curr_step) not enforced
) WITH (
     'connector' = 'jdbc'
    ,'url' = 'jdbc:mysql://111/iap'
    ,'username' = 'db_iap'
    ,'password' = '加密内容2'
    ,'table-name' = 'dm_cust_oact_prog_ri'
);
CREATE TABLE dm_cust_oact_prog_ri_print (
     cust_id STRING COMMENT '客户id'
    ,cust_nme STRING COMMENT '客户姓名'
    ,cust_mob_tel STRING COMMENT '客户手机号'
    ,cust_curr_step STRING COMMENT '客户当前步骤'
    ,cust_curr_step_num INT COMMENT '客户当前步骤数字'
    ,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间'
    ,user_id STRING COMMENT '开户时使用的user_id'
    ,tech_sys_time STRING COMMENT '技术字段,更新时间'
    ,primary key (user_id,cust_curr_step) not enforced
) WITH (
  'connector' = 'print'
);
CREATE TABLE dm_crh_cust_oact_rec_ri
(
     op_type string
    ,op_ts string
    ,`after` string
    ,current_ts string
    ,curr_datetime as get_json_value(`after`,'CURR_DATETIME')
    ,user_id as get_json_value(`after`,'USER_ID')
    ,request_no as get_json_value(`after`,'REQUEST_NO')
    ,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
    ,proc_time as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'BUSINFLOWRECORD',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_cust_info_ri
(
     op_type string
    ,op_ts string
    ,`after` string
    ,current_ts string
    ,client_name as get_json_value(`after`,'CLIENT_NAME')
    ,request_status as get_json_value(`after`,'REQUEST_STATUS')
    ,mobile_tel as get_json_value(`after`,'MOBILE_TEL')
    ,user_id as get_json_value(`after`,'USER_ID')
    ,active_datetime as get_json_value(`after`,'ACTIVE_DATETIME')
    ,channel_code as get_json_value(`after`,'CHANNEL_CODE')
    ,broker_code as get_json_value(`after`,'BROKER_CODE')
    ,user_gender as get_json_value(`after`,'USER_GENDER')
    ,birthday as get_json_value(`after`,'BIRTHDAY')
    ,client_id as get_json_value(`after`,'CLIENT_ID')
) WITH (
  'connector' = 'kafka',
  'topic' = 'USERQUERYEXTINFO',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_audit_rec_ri
(
     op_type string
    ,op_ts string
    ,`after` string
    ,current_ts string
    ,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
    ,curr_datetime as get_json_value(`after`,'CURR_DATETIME')
    ,request_no as get_json_value(`after`,'REQUEST_NO')
) WITH (
  'connector' = 'kafka',
  'topic' = 'CRH_USER.BUSINFLOWAUDITRECORD',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_user_vidro_ri
(
     op_type string
    ,op_ts string
    ,`after` string
    ,current_ts string
    ,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
    ,create_datetime as get_json_value(`after`,'CREATE_DATETIME')
    ,user_id as get_json_value(`after`,'USER_ID')
    ,join_position_str as get_json_value(`after`,'JOIN_POSITION_STR')
) WITH (
  'connector' = 'kafka',
  'topic' = 'CRH_USER.USERVIDEOFLOW',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
create view v_dm_cust_oact_prog_ri as
select
    cust_id
    ,cust_nme
    ,cust_mob_tel
    ,cust_curr_step
    ,cust_curr_step_num
    ,cust_curr_step_occu_tm
    ,user_id
    ,FROM_UNIXTIME(UNIX_TIMESTAMP()) AS tech_sys_time
from
(
    select
        cust_id
        ,cust_nme
        ,cust_mob_tel
        ,cust_curr_step
        ,cust_curr_step_num
        ,cust_curr_step_occu_tm
        ,user_id
        ,ROW_NUMBER() OVER(PARTITION BY user_id,cust_curr_step ORDER BY cust_curr_step_occu_tm DESC) AS rn
    from
    (
        select
            t2.client_id as cust_id
            ,t2.client_name as cust_nme
            ,t2.mobile_tel as cust_mob_tel
            ,case when t2.active_datetime is not null then '开户成功'
                when t5.business_flag_audit in ('1003','1011') then '人工审核'
                when t1.business_flag = '22114' then '提交申请'
                when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then '视屏验证'
                when t1.business_flag = '22113' then '绑定三方存管'
                when t1.business_flag = '22112' then '设置密码'
                when t1.business_flag = '22109' then '协议签署'
                when t1.business_flag = '22108' then '开通账户选择'
                when t1.business_flag = '22110' then '风险评测'
                when t1.business_flag = '22106' then '填写基本资料'
                when t1.business_flag = '22107' then '上传身份证'
                when t1.business_flag = '22111' then '选择营业部'
                when t1.business_flag = '12100' then '新开户:注册申请开户' end as cust_curr_step
            ,case when t2.active_datetime is not null then 13
                when t5.business_flag_audit in ('1003','1011') then 12
                when t1.business_flag = '22114' then 11
                when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then 10
                when t1.business_flag = '22113' then 9
                when t1.business_flag = '22112' then 8
                when t1.business_flag = '22109' then 7
                when t1.business_flag = '22108' then 6
                when t1.business_flag = '22110' then 5
                when t1.business_flag = '22106' then 4
                when t1.business_flag = '22107' then 3
                when t1.business_flag = '22111' then 2
                when t1.business_flag = '12100' then 1 end as cust_curr_step_num
            ,case when t2.active_datetime is not null then t2.active_datetime
                  when t5.business_flag_audit is not null then t5.curr_datetime
                  when t5.business_flag_video is not null then t5.create_datetime
                  else t1.curr_datetime end as cust_curr_step_occu_tm
            ,t1.user_id
        from
        (
            select
                curr_datetime
                ,user_id
                ,business_flag
            from
            (
                select
                    replace(curr_datetime,'-','') as curr_datetime
                    ,user_id
                    ,business_flag
                    ,ROW_NUMBER() OVER(PARTITION BY user_id,business_flag ORDER BY curr_datetime DESC) AS rn
                from dm_crh_cust_oact_rec_ri
                where business_flag in ('22114','22113','22112','22109','22108','22110','22106','22107','22111','12100','22115','33500')
            )t
            where rn = 1
        ) t1
        left join
        (
            select
                user_id
                ,client_name
                ,mobile_tel
                ,replace(substr(active_datetime,1,19),'-','') as active_datetime
                ,client_id
            from dm_crh_cust_info_ri
        ) t2
        on t1.user_id = t2.user_id
        left join
        (
            select
                t1.user_id
                ,t1.join_position_str
                ,replace(t1.create_datetime,'-','') as create_datetime
                ,t1.business_flag AS business_flag_video
                ,t2.business_flag AS business_flag_audit
                ,replace(t2.curr_datetime,'-','') as curr_datetime
            from dm_crh_user_vidro_ri t1
            left join dm_crh_audit_rec_ri t2
            on t1.join_position_str = t2.request_no
            where t1.business_flag in ('1200','1202','1203')
            or t2.business_flag in ('1003','1011')
        ) t5
        on t1.user_id = t5.user_id
    ) t
    where cust_curr_step is not null
) t
where rn = 1
;
insert into dm_cust_oact_prog_ri_print
select
    cust_id
    ,cust_nme
    ,cust_mob_tel
    ,cust_curr_step
    ,cust_curr_step_num
    ,cust_curr_step_occu_tm
    ,user_id
    ,tech_sys_time
from v_dm_cust_oact_prog_ri
;
insert into dm_cust_oact_prog_ri
select
    cust_id
    ,cust_nme
    ,cust_mob_tel
    ,cust_curr_step
    ,cust_curr_step_num
    ,cust_curr_step_occu_tm
    ,user_id
    ,tech_sys_time
from v_dm_cust_oact_prog_ricreate function get_json_value as 'com.nesc.flink.udf.GetJsonValue';
set 'table.exec.sink.not-null-enforcer'='drop';
----测试环境
CREATE TABLE dm_cust_oact_prog_ri (
     cust_id STRING COMMENT '客户id'
    ,cust_nme STRING COMMENT '客户姓名'
    ,cust_mob_tel STRING COMMENT '客户手机号'
    ,cust_curr_step STRING COMMENT '客户当前步骤'
    ,cust_curr_step_num INT COMMENT '客户当前步骤数字'
    ,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间'
    ,user_id STRING COMMENT '开户时使用的user_id'
    ,tech_sys_time STRING COMMENT '技术字段,更新时间'
    ,primary key (user_id,cust_curr_step) not enforced
) WITH (
     'connector' = 'jdbc'
    ,'url' = 'jdbc:mysql://111/iap'
    ,'username' = 'db_iap'
    ,'password' = '加密内容2'
    ,'table-name' = 'dm_cust_oact_prog_ri'
);
CREATE TABLE dm_cust_oact_prog_ri_print (
     cust_id STRING COMMENT '客户id'
    ,cust_nme STRING COMMENT '客户姓名'
    ,cust_mob_tel STRING COMMENT '客户手机号'
    ,cust_curr_step STRING COMMENT '客户当前步骤'
    ,cust_curr_step_num INT COMMENT '客户当前步骤数字'
    ,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间'
    ,user_id STRING COMMENT '开户时使用的user_id'
    ,tech_sys_time STRING COMMENT '技术字段,更新时间'
    ,primary key (user_id,cust_curr_step) not enforced
) WITH (
  'connector' = 'print'
);
CREATE TABLE dm_crh_cust_oact_rec_ri
(
     op_type string
    ,op_ts string
    ,`after` string
    ,current_ts string
    ,curr_datetime as get_json_value(`after`,'CURR_DATETIME')
    ,user_id as get_json_value(`after`,'USER_ID')
    ,request_no as get_json_value(`after`,'REQUEST_NO')
    ,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
    ,proc_time as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'BUSINFLOWRECORD',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_cust_info_ri
(
     op_type string
    ,op_ts string
    ,`after` string
    ,current_ts string
    ,client_name as get_json_value(`after`,'CLIENT_NAME')
    ,request_status as get_json_value(`after`,'REQUEST_STATUS')
    ,mobile_tel as get_json_value(`after`,'MOBILE_TEL')
    ,user_id as get_json_value(`after`,'USER_ID')
    ,active_datetime as get_json_value(`after`,'ACTIVE_DATETIME')
    ,channel_code as get_json_value(`after`,'CHANNEL_CODE')
    ,broker_code as get_json_value(`after`,'BROKER_CODE')
    ,user_gender as get_json_value(`after`,'USER_GENDER')
    ,birthday as get_json_value(`after`,'BIRTHDAY')
    ,client_id as get_json_value(`after`,'CLIENT_ID')
) WITH (
  'connector' = 'kafka',
  'topic' = 'USERQUERYEXTINFO',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_audit_rec_ri
(
     op_type string
    ,op_ts string
    ,`after` string
    ,current_ts string
    ,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
    ,curr_datetime as get_json_value(`after`,'CURR_DATETIME')
    ,request_no as get_json_value(`after`,'REQUEST_NO')
) WITH (
  'connector' = 'kafka',
  'topic' = 'CRH_USER.BUSINFLOWAUDITRECORD',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_user_vidro_ri
(
     op_type string
    ,op_ts string
    ,`after` string
    ,current_ts string
    ,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
    ,create_datetime as get_json_value(`after`,'CREATE_DATETIME')
    ,user_id as get_json_value(`after`,'USER_ID')
    ,join_position_str as get_json_value(`after`,'JOIN_POSITION_STR')
) WITH (
  'connector' = 'kafka',
  'topic' = 'CRH_USER.USERVIDEOFLOW',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
create view v_dm_cust_oact_prog_ri as
select
    cust_id
    ,cust_nme
    ,cust_mob_tel
    ,cust_curr_step
    ,cust_curr_step_num
    ,cust_curr_step_occu_tm
    ,user_id
    ,FROM_UNIXTIME(UNIX_TIMESTAMP()) AS tech_sys_time
from
(
    select
        cust_id
        ,cust_nme
        ,cust_mob_tel
        ,cust_curr_step
        ,cust_curr_step_num
        ,cust_curr_step_occu_tm
        ,user_id
        ,ROW_NUMBER() OVER(PARTITION BY user_id,cust_curr_step ORDER BY cust_curr_step_occu_tm DESC) AS rn
    from
    (
        select
            t2.client_id as cust_id
            ,t2.client_name as cust_nme
            ,t2.mobile_tel as cust_mob_tel
            ,case when t2.active_datetime is not null then '开户成功'
                when t5.business_flag_audit in ('1003','1011') then '人工审核'
                when t1.business_flag = '22114' then '提交申请'
                when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then '视屏验证'
                when t1.business_flag = '22113' then '绑定三方存管'
                when t1.business_flag = '22112' then '设置密码'
                when t1.business_flag = '22109' then '协议签署'
                when t1.business_flag = '22108' then '开通账户选择'
                when t1.business_flag = '22110' then '风险评测'
                when t1.business_flag = '22106' then '填写基本资料'
                when t1.business_flag = '22107' then '上传身份证'
                when t1.business_flag = '22111' then '选择营业部'
                when t1.business_flag = '12100' then '新开户:注册申请开户' end as cust_curr_step
            ,case when t2.active_datetime is not null then 13
                when t5.business_flag_audit in ('1003','1011') then 12
                when t1.business_flag = '22114' then 11
                when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then 10
                when t1.business_flag = '22113' then 9
                when t1.business_flag = '22112' then 8
                when t1.business_flag = '22109' then 7
                when t1.business_flag = '22108' then 6
                when t1.business_flag = '22110' then 5
                when t1.business_flag = '22106' then 4
                when t1.business_flag = '22107' then 3
                when t1.business_flag = '22111' then 2
                when t1.business_flag = '12100' then 1 end as cust_curr_step_num
            ,case when t2.active_datetime is not null then t2.active_datetime
                  when t5.business_flag_audit is not null then t5.curr_datetime
                  when t5.business_flag_video is not null then t5.create_datetime
                  else t1.curr_datetime end as cust_curr_step_occu_tm
            ,t1.user_id
        from
        (
            select
                curr_datetime
                ,user_id
                ,business_flag
            from
            (
                select
                    replace(curr_datetime,'-','') as curr_datetime
                    ,user_id
                    ,business_flag
                    ,ROW_NUMBER() OVER(PARTITION BY user_id,business_flag ORDER BY curr_datetime DESC) AS rn
                from dm_crh_cust_oact_rec_ri
                where business_flag in ('22114','22113','22112','22109','22108','22110','22106','22107','22111','12100','22115','33500')
            )t
            where rn = 1
        ) t1
        left join
        (
            select
                user_id
                ,client_name
                ,mobile_tel
                ,replace(substr(active_datetime,1,19),'-','') as active_datetime
                ,client_id
            from dm_crh_cust_info_ri
        ) t2
        on t1.user_id = t2.user_id
        left join
        (
            select
                t1.user_id
                ,t1.join_position_str
                ,replace(t1.create_datetime,'-','') as create_datetime
                ,t1.business_flag AS business_flag_video
                ,t2.business_flag AS business_flag_audit
                ,replace(t2.curr_datetime,'-','') as curr_datetime
            from dm_crh_user_vidro_ri t1
            left join dm_crh_audit_rec_ri t2
            on t1.join_position_str = t2.request_no
            where t1.business_flag in ('1200','1202','1203')
            or t2.business_flag in ('1003','1011')
        ) t5
        on t1.user_id = t5.user_id
    ) t
    where cust_curr_step is not null
) t
where rn = 1
;
insert into dm_cust_oact_prog_ri_print
select
    cust_id
    ,cust_nme
    ,cust_mob_tel
    ,cust_curr_step
    ,cust_curr_step_num
    ,cust_curr_step_occu_tm
    ,user_id
    ,tech_sys_time
from v_dm_cust_oact_prog_ri
;
insert into dm_cust_oact_prog_ri
select
    cust_id
    ,cust_nme
    ,cust_mob_tel
    ,cust_curr_step
    ,cust_curr_step_num
    ,cust_curr_step_occu_tm
    ,user_id
    ,tech_sys_time
from v_dm_cust_oact_prog_ri】
| |
小昌同学
|
|
ccc0606fighting@163.com
|