You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小昌同学 <cc...@163.com> on 2023/03/28 00:48:49 UTC
flink sql upsert mysql问题
你好,我这边使用flink sql实现四条流的关联,后续实现case when的逻辑,并且将数据插入到mysql,但是从结果数据来看,数据存在部分丢失,代码我粘贴再后面,麻烦各位老师指导,下面是sql【create function get_json_value as 'com.nesc.flink.udf.GetJsonValue';
set 'table.exec.sink.not-null-enforcer'='drop';
----测试环境
CREATE TABLE dm_cust_oact_prog_ri (
cust_id STRING COMMENT '客户id'
,cust_nme STRING COMMENT '客户姓名'
,cust_mob_tel STRING COMMENT '客户手机号'
,cust_curr_step STRING COMMENT '客户当前步骤'
,cust_curr_step_num INT COMMENT '客户当前步骤数字'
,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间'
,user_id STRING COMMENT '开户时使用的user_id'
,tech_sys_time STRING COMMENT '技术字段,更新时间'
,primary key (user_id,cust_curr_step) not enforced
) WITH (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://111/iap'
,'username' = 'db_iap'
,'password' = '加密内容2'
,'table-name' = 'dm_cust_oact_prog_ri'
);
CREATE TABLE dm_cust_oact_prog_ri_print (
cust_id STRING COMMENT '客户id'
,cust_nme STRING COMMENT '客户姓名'
,cust_mob_tel STRING COMMENT '客户手机号'
,cust_curr_step STRING COMMENT '客户当前步骤'
,cust_curr_step_num INT COMMENT '客户当前步骤数字'
,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间'
,user_id STRING COMMENT '开户时使用的user_id'
,tech_sys_time STRING COMMENT '技术字段,更新时间'
,primary key (user_id,cust_curr_step) not enforced
) WITH (
'connector' = 'print'
);
CREATE TABLE dm_crh_cust_oact_rec_ri
(
op_type string
,op_ts string
,`after` string
,current_ts string
,curr_datetime as get_json_value(`after`,'CURR_DATETIME')
,user_id as get_json_value(`after`,'USER_ID')
,request_no as get_json_value(`after`,'REQUEST_NO')
,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
,proc_time as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'BUSINFLOWRECORD',
'properties.bootstrap.servers' = '111',
'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
--'csv.field-delimiter' = ','
--'scan.startup.mode' = 'timestamp',
--'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_cust_info_ri
(
op_type string
,op_ts string
,`after` string
,current_ts string
,client_name as get_json_value(`after`,'CLIENT_NAME')
,request_status as get_json_value(`after`,'REQUEST_STATUS')
,mobile_tel as get_json_value(`after`,'MOBILE_TEL')
,user_id as get_json_value(`after`,'USER_ID')
,active_datetime as get_json_value(`after`,'ACTIVE_DATETIME')
,channel_code as get_json_value(`after`,'CHANNEL_CODE')
,broker_code as get_json_value(`after`,'BROKER_CODE')
,user_gender as get_json_value(`after`,'USER_GENDER')
,birthday as get_json_value(`after`,'BIRTHDAY')
,client_id as get_json_value(`after`,'CLIENT_ID')
) WITH (
'connector' = 'kafka',
'topic' = 'USERQUERYEXTINFO',
'properties.bootstrap.servers' = '111',
'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
--'csv.field-delimiter' = ','
--'scan.startup.mode' = 'timestamp',
--'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_audit_rec_ri
(
op_type string
,op_ts string
,`after` string
,current_ts string
,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
,curr_datetime as get_json_value(`after`,'CURR_DATETIME')
,request_no as get_json_value(`after`,'REQUEST_NO')
) WITH (
'connector' = 'kafka',
'topic' = 'CRH_USER.BUSINFLOWAUDITRECORD',
'properties.bootstrap.servers' = '111',
'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
--'csv.field-delimiter' = ','
--'scan.startup.mode' = 'timestamp',
--'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_user_vidro_ri
(
op_type string
,op_ts string
,`after` string
,current_ts string
,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
,create_datetime as get_json_value(`after`,'CREATE_DATETIME')
,user_id as get_json_value(`after`,'USER_ID')
,join_position_str as get_json_value(`after`,'JOIN_POSITION_STR')
) WITH (
'connector' = 'kafka',
'topic' = 'CRH_USER.USERVIDEOFLOW',
'properties.bootstrap.servers' = '111',
'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
--'csv.field-delimiter' = ','
--'scan.startup.mode' = 'timestamp',
--'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
create view v_dm_cust_oact_prog_ri as
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,FROM_UNIXTIME(UNIX_TIMESTAMP()) AS tech_sys_time
from
(
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,ROW_NUMBER() OVER(PARTITION BY user_id,cust_curr_step ORDER BY cust_curr_step_occu_tm DESC) AS rn
from
(
select
t2.client_id as cust_id
,t2.client_name as cust_nme
,t2.mobile_tel as cust_mob_tel
,case when t2.active_datetime is not null then '开户成功'
when t5.business_flag_audit in ('1003','1011') then '人工审核'
when t1.business_flag = '22114' then '提交申请'
when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then '视屏验证'
when t1.business_flag = '22113' then '绑定三方存管'
when t1.business_flag = '22112' then '设置密码'
when t1.business_flag = '22109' then '协议签署'
when t1.business_flag = '22108' then '开通账户选择'
when t1.business_flag = '22110' then '风险评测'
when t1.business_flag = '22106' then '填写基本资料'
when t1.business_flag = '22107' then '上传身份证'
when t1.business_flag = '22111' then '选择营业部'
when t1.business_flag = '12100' then '新开户:注册申请开户' end as cust_curr_step
,case when t2.active_datetime is not null then 13
when t5.business_flag_audit in ('1003','1011') then 12
when t1.business_flag = '22114' then 11
when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then 10
when t1.business_flag = '22113' then 9
when t1.business_flag = '22112' then 8
when t1.business_flag = '22109' then 7
when t1.business_flag = '22108' then 6
when t1.business_flag = '22110' then 5
when t1.business_flag = '22106' then 4
when t1.business_flag = '22107' then 3
when t1.business_flag = '22111' then 2
when t1.business_flag = '12100' then 1 end as cust_curr_step_num
,case when t2.active_datetime is not null then t2.active_datetime
when t5.business_flag_audit is not null then t5.curr_datetime
when t5.business_flag_video is not null then t5.create_datetime
else t1.curr_datetime end as cust_curr_step_occu_tm
,t1.user_id
from
(
select
curr_datetime
,user_id
,business_flag
from
(
select
replace(curr_datetime,'-','') as curr_datetime
,user_id
,business_flag
,ROW_NUMBER() OVER(PARTITION BY user_id,business_flag ORDER BY curr_datetime DESC) AS rn
from dm_crh_cust_oact_rec_ri
where business_flag in ('22114','22113','22112','22109','22108','22110','22106','22107','22111','12100','22115','33500')
)t
where rn = 1
) t1
left join
(
select
user_id
,client_name
,mobile_tel
,replace(substr(active_datetime,1,19),'-','') as active_datetime
,client_id
from dm_crh_cust_info_ri
) t2
on t1.user_id = t2.user_id
left join
(
select
t1.user_id
,t1.join_position_str
,replace(t1.create_datetime,'-','') as create_datetime
,t1.business_flag AS business_flag_video
,t2.business_flag AS business_flag_audit
,replace(t2.curr_datetime,'-','') as curr_datetime
from dm_crh_user_vidro_ri t1
left join dm_crh_audit_rec_ri t2
on t1.join_position_str = t2.request_no
where t1.business_flag in ('1200','1202','1203')
or t2.business_flag in ('1003','1011')
) t5
on t1.user_id = t5.user_id
) t
where cust_curr_step is not null
) t
where rn = 1
;
insert into dm_cust_oact_prog_ri_print
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,tech_sys_time
from v_dm_cust_oact_prog_ri
;
insert into dm_cust_oact_prog_ri
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,tech_sys_time
from v_dm_cust_oact_prog_ricreate function get_json_value as 'com.nesc.flink.udf.GetJsonValue';
set 'table.exec.sink.not-null-enforcer'='drop';
----测试环境
CREATE TABLE dm_cust_oact_prog_ri (
cust_id STRING COMMENT '客户id'
,cust_nme STRING COMMENT '客户姓名'
,cust_mob_tel STRING COMMENT '客户手机号'
,cust_curr_step STRING COMMENT '客户当前步骤'
,cust_curr_step_num INT COMMENT '客户当前步骤数字'
,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间'
,user_id STRING COMMENT '开户时使用的user_id'
,tech_sys_time STRING COMMENT '技术字段,更新时间'
,primary key (user_id,cust_curr_step) not enforced
) WITH (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://111/iap'
,'username' = 'db_iap'
,'password' = '加密内容2'
,'table-name' = 'dm_cust_oact_prog_ri'
);
CREATE TABLE dm_cust_oact_prog_ri_print (
cust_id STRING COMMENT '客户id'
,cust_nme STRING COMMENT '客户姓名'
,cust_mob_tel STRING COMMENT '客户手机号'
,cust_curr_step STRING COMMENT '客户当前步骤'
,cust_curr_step_num INT COMMENT '客户当前步骤数字'
,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间'
,user_id STRING COMMENT '开户时使用的user_id'
,tech_sys_time STRING COMMENT '技术字段,更新时间'
,primary key (user_id,cust_curr_step) not enforced
) WITH (
'connector' = 'print'
);
CREATE TABLE dm_crh_cust_oact_rec_ri
(
op_type string
,op_ts string
,`after` string
,current_ts string
,curr_datetime as get_json_value(`after`,'CURR_DATETIME')
,user_id as get_json_value(`after`,'USER_ID')
,request_no as get_json_value(`after`,'REQUEST_NO')
,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
,proc_time as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'BUSINFLOWRECORD',
'properties.bootstrap.servers' = '111',
'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
--'csv.field-delimiter' = ','
--'scan.startup.mode' = 'timestamp',
--'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_cust_info_ri
(
op_type string
,op_ts string
,`after` string
,current_ts string
,client_name as get_json_value(`after`,'CLIENT_NAME')
,request_status as get_json_value(`after`,'REQUEST_STATUS')
,mobile_tel as get_json_value(`after`,'MOBILE_TEL')
,user_id as get_json_value(`after`,'USER_ID')
,active_datetime as get_json_value(`after`,'ACTIVE_DATETIME')
,channel_code as get_json_value(`after`,'CHANNEL_CODE')
,broker_code as get_json_value(`after`,'BROKER_CODE')
,user_gender as get_json_value(`after`,'USER_GENDER')
,birthday as get_json_value(`after`,'BIRTHDAY')
,client_id as get_json_value(`after`,'CLIENT_ID')
) WITH (
'connector' = 'kafka',
'topic' = 'USERQUERYEXTINFO',
'properties.bootstrap.servers' = '111',
'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
--'csv.field-delimiter' = ','
--'scan.startup.mode' = 'timestamp',
--'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_audit_rec_ri
(
op_type string
,op_ts string
,`after` string
,current_ts string
,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
,curr_datetime as get_json_value(`after`,'CURR_DATETIME')
,request_no as get_json_value(`after`,'REQUEST_NO')
) WITH (
'connector' = 'kafka',
'topic' = 'CRH_USER.BUSINFLOWAUDITRECORD',
'properties.bootstrap.servers' = '111',
'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
--'csv.field-delimiter' = ','
--'scan.startup.mode' = 'timestamp',
--'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_user_vidro_ri
(
op_type string
,op_ts string
,`after` string
,current_ts string
,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
,create_datetime as get_json_value(`after`,'CREATE_DATETIME')
,user_id as get_json_value(`after`,'USER_ID')
,join_position_str as get_json_value(`after`,'JOIN_POSITION_STR')
) WITH (
'connector' = 'kafka',
'topic' = 'CRH_USER.USERVIDEOFLOW',
'properties.bootstrap.servers' = '111',
'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
--'csv.field-delimiter' = ','
--'scan.startup.mode' = 'timestamp',
--'scan.startup.timestamp-millis' = '1675353600000', --通过 unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
create view v_dm_cust_oact_prog_ri as
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,FROM_UNIXTIME(UNIX_TIMESTAMP()) AS tech_sys_time
from
(
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,ROW_NUMBER() OVER(PARTITION BY user_id,cust_curr_step ORDER BY cust_curr_step_occu_tm DESC) AS rn
from
(
select
t2.client_id as cust_id
,t2.client_name as cust_nme
,t2.mobile_tel as cust_mob_tel
,case when t2.active_datetime is not null then '开户成功'
when t5.business_flag_audit in ('1003','1011') then '人工审核'
when t1.business_flag = '22114' then '提交申请'
when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then '视屏验证'
when t1.business_flag = '22113' then '绑定三方存管'
when t1.business_flag = '22112' then '设置密码'
when t1.business_flag = '22109' then '协议签署'
when t1.business_flag = '22108' then '开通账户选择'
when t1.business_flag = '22110' then '风险评测'
when t1.business_flag = '22106' then '填写基本资料'
when t1.business_flag = '22107' then '上传身份证'
when t1.business_flag = '22111' then '选择营业部'
when t1.business_flag = '12100' then '新开户:注册申请开户' end as cust_curr_step
,case when t2.active_datetime is not null then 13
when t5.business_flag_audit in ('1003','1011') then 12
when t1.business_flag = '22114' then 11
when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then 10
when t1.business_flag = '22113' then 9
when t1.business_flag = '22112' then 8
when t1.business_flag = '22109' then 7
when t1.business_flag = '22108' then 6
when t1.business_flag = '22110' then 5
when t1.business_flag = '22106' then 4
when t1.business_flag = '22107' then 3
when t1.business_flag = '22111' then 2
when t1.business_flag = '12100' then 1 end as cust_curr_step_num
,case when t2.active_datetime is not null then t2.active_datetime
when t5.business_flag_audit is not null then t5.curr_datetime
when t5.business_flag_video is not null then t5.create_datetime
else t1.curr_datetime end as cust_curr_step_occu_tm
,t1.user_id
from
(
select
curr_datetime
,user_id
,business_flag
from
(
select
replace(curr_datetime,'-','') as curr_datetime
,user_id
,business_flag
,ROW_NUMBER() OVER(PARTITION BY user_id,business_flag ORDER BY curr_datetime DESC) AS rn
from dm_crh_cust_oact_rec_ri
where business_flag in ('22114','22113','22112','22109','22108','22110','22106','22107','22111','12100','22115','33500')
)t
where rn = 1
) t1
left join
(
select
user_id
,client_name
,mobile_tel
,replace(substr(active_datetime,1,19),'-','') as active_datetime
,client_id
from dm_crh_cust_info_ri
) t2
on t1.user_id = t2.user_id
left join
(
select
t1.user_id
,t1.join_position_str
,replace(t1.create_datetime,'-','') as create_datetime
,t1.business_flag AS business_flag_video
,t2.business_flag AS business_flag_audit
,replace(t2.curr_datetime,'-','') as curr_datetime
from dm_crh_user_vidro_ri t1
left join dm_crh_audit_rec_ri t2
on t1.join_position_str = t2.request_no
where t1.business_flag in ('1200','1202','1203')
or t2.business_flag in ('1003','1011')
) t5
on t1.user_id = t5.user_id
) t
where cust_curr_step is not null
) t
where rn = 1
;
insert into dm_cust_oact_prog_ri_print
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,tech_sys_time
from v_dm_cust_oact_prog_ri
;
insert into dm_cust_oact_prog_ri
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,tech_sys_time
from v_dm_cust_oact_prog_ri】
| |
小昌同学
|
|
ccc0606fighting@163.com
|