You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "casel.chen" <ca...@126.com> on 2023/02/09 04:03:21 UTC
flink canal json格式忽略不识别的type
不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal json格式解析时直接忽略不识别的type,例如
例1:
{"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby` varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `updatedby` varchar(255) DEFAULT NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) DEFAULT NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) DEFAULT NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext, `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) DEFAULT NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
例2:
{
"action":"ALTER",
"before":[],
"bid":0,
"data":[],
"db":"db_test",
"dbValType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"ddl":true,
"entryType":"ROWDATA",
"execTs":1669789188000,
"jdbcType":{
"col1":12,
"col2":12,
"col_pk":12
},
"pks":[],
"schema":"db_test",
"sendTs":1669789189533,
"sql":"alter table table_test add col2 varchar(22) null",
"table":"table_test",
"tableChanges":{
"table":{
"columns":[
{
"jdbcType":12, // jdbc 类型。
"name":"col1", // 字段名称。
"position":0, // 字段的顺序。
"typeExpression":"varchar(22)", // 类型描述。
"typeName":"varchar" // 类型名称。
},
{
"jdbcType":12,
"name":"col2",
"position":1,
"typeExpression":"varchar(22)",
"typeName":"varchar"
},
{
"jdbcType":12,
"name":"col_pk",
"position":2,
"typeExpression":"varchar(22)",
"typeName":"varchar"
}
],
"primaryKeyColumnNames":["col_pk"] // 主键名列表。
},
"type":"ALTER"
}
}
Re: Re: flink canal json格式忽略不识别的type
Posted by Weihua Hu <hu...@gmail.com>.
Hi,
可以尝试使用: json.ignore-parse-errors[1] 来忽略解析的报错,需要注意这个参数会忽略所有的解析错误
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/#json-ignore-parse-errors
Best,
Weihua
On Mon, Feb 20, 2023 at 10:14 AM casel.chen <ca...@126.com> wrote:
> 日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-20 09:58:56,"Shengkai Fang" <fs...@gmail.com> 写道:
> >Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
> >
> >Best,
> >Shengkai
> >
> >casel.chen <ca...@126.com> 于2023年2月9日周四 12:03写道:
> >
> >> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
> >> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
> >> json格式解析时直接忽略不识别的type,例如
> >> 例1:
> >>
> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
> >> TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby`
> >> varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT
> >> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp
> NOT
> >> NULL DEFAULT '0000-00-00 00:00:00', `updatedby` varchar(255) DEFAULT
> >> NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255)
> DEFAULT
> >> NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255)
> DEFAULT
> >> NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext,
> >> `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255)
> DEFAULT
> >> NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT
> >> NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255)
> >> DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB
> DEFAULT
> >> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
> >>
> >>
> >> 例2:
> >> {
> >> "action":"ALTER",
> >> "before":[],
> >> "bid":0,
> >> "data":[],
> >> "db":"db_test",
> >> "dbValType":{
> >> "col1":"varchar(22)",
> >> "col2":"varchar(22)",
> >> "col_pk":"varchar(22)"
> >> },
> >> "ddl":true,
> >> "entryType":"ROWDATA",
> >> "execTs":1669789188000,
> >> "jdbcType":{
> >> "col1":12,
> >> "col2":12,
> >> "col_pk":12
> >> },
> >> "pks":[],
> >> "schema":"db_test",
> >> "sendTs":1669789189533,
> >> "sql":"alter table table_test add col2 varchar(22) null",
> >> "table":"table_test",
> >> "tableChanges":{
> >> "table":{
> >> "columns":[
> >> {
> >> "jdbcType":12, // jdbc 类型。
> >> "name":"col1", // 字段名称。
> >> "position":0, // 字段的顺序。
> >> "typeExpression":"varchar(22)", // 类型描述。
> >> "typeName":"varchar" // 类型名称。
> >> },
> >> {
> >> "jdbcType":12,
> >> "name":"col2",
> >> "position":1,
> >> "typeExpression":"varchar(22)",
> >> "typeName":"varchar"
> >> },
> >> {
> >> "jdbcType":12,
> >> "name":"col_pk",
> >> "position":2,
> >> "typeExpression":"varchar(22)",
> >> "typeName":"varchar"
> >> }
> >> ],
> >> "primaryKeyColumnNames":["col_pk"] // 主键名列表。
> >> },
> >> "type":"ALTER"
> >> }
> >> }
>
Re:Re: flink canal json格式忽略不识别的type
Posted by "casel.chen" <ca...@126.com>.
日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了
在 2023-02-20 09:58:56,"Shengkai Fang" <fs...@gmail.com> 写道:
>Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
>
>Best,
>Shengkai
>
>casel.chen <ca...@126.com> 于2023年2月9日周四 12:03写道:
>
>> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
>> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
>> json格式解析时直接忽略不识别的type,例如
>> 例1:
>> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
>> TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby`
>> varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT
>> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp NOT
>> NULL DEFAULT '0000-00-00 00:00:00', `updatedby` varchar(255) DEFAULT
>> NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) DEFAULT
>> NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) DEFAULT
>> NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext,
>> `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) DEFAULT
>> NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT
>> NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255)
>> DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT
>> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
>>
>>
>> 例2:
>> {
>> "action":"ALTER",
>> "before":[],
>> "bid":0,
>> "data":[],
>> "db":"db_test",
>> "dbValType":{
>> "col1":"varchar(22)",
>> "col2":"varchar(22)",
>> "col_pk":"varchar(22)"
>> },
>> "ddl":true,
>> "entryType":"ROWDATA",
>> "execTs":1669789188000,
>> "jdbcType":{
>> "col1":12,
>> "col2":12,
>> "col_pk":12
>> },
>> "pks":[],
>> "schema":"db_test",
>> "sendTs":1669789189533,
>> "sql":"alter table table_test add col2 varchar(22) null",
>> "table":"table_test",
>> "tableChanges":{
>> "table":{
>> "columns":[
>> {
>> "jdbcType":12, // jdbc 类型。
>> "name":"col1", // 字段名称。
>> "position":0, // 字段的顺序。
>> "typeExpression":"varchar(22)", // 类型描述。
>> "typeName":"varchar" // 类型名称。
>> },
>> {
>> "jdbcType":12,
>> "name":"col2",
>> "position":1,
>> "typeExpression":"varchar(22)",
>> "typeName":"varchar"
>> },
>> {
>> "jdbcType":12,
>> "name":"col_pk",
>> "position":2,
>> "typeExpression":"varchar(22)",
>> "typeName":"varchar"
>> }
>> ],
>> "primaryKeyColumnNames":["col_pk"] // 主键名列表。
>> },
>> "type":"ALTER"
>> }
>> }
Re: flink canal json格式忽略不识别的type
Posted by Shengkai Fang <fs...@gmail.com>.
Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
Best,
Shengkai
casel.chen <ca...@126.com> 于2023年2月9日周四 12:03写道:
> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
> json格式解析时直接忽略不识别的type,例如
> 例1:
> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
> TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby`
> varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT
> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp NOT
> NULL DEFAULT '0000-00-00 00:00:00', `updatedby` varchar(255) DEFAULT
> NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) DEFAULT
> NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) DEFAULT
> NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext,
> `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) DEFAULT
> NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT
> NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255)
> DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT
> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
>
>
> 例2:
> {
> "action":"ALTER",
> "before":[],
> "bid":0,
> "data":[],
> "db":"db_test",
> "dbValType":{
> "col1":"varchar(22)",
> "col2":"varchar(22)",
> "col_pk":"varchar(22)"
> },
> "ddl":true,
> "entryType":"ROWDATA",
> "execTs":1669789188000,
> "jdbcType":{
> "col1":12,
> "col2":12,
> "col_pk":12
> },
> "pks":[],
> "schema":"db_test",
> "sendTs":1669789189533,
> "sql":"alter table table_test add col2 varchar(22) null",
> "table":"table_test",
> "tableChanges":{
> "table":{
> "columns":[
> {
> "jdbcType":12, // jdbc 类型。
> "name":"col1", // 字段名称。
> "position":0, // 字段的顺序。
> "typeExpression":"varchar(22)", // 类型描述。
> "typeName":"varchar" // 类型名称。
> },
> {
> "jdbcType":12,
> "name":"col2",
> "position":1,
> "typeExpression":"varchar(22)",
> "typeName":"varchar"
> },
> {
> "jdbcType":12,
> "name":"col_pk",
> "position":2,
> "typeExpression":"varchar(22)",
> "typeName":"varchar"
> }
> ],
> "primaryKeyColumnNames":["col_pk"] // 主键名列表。
> },
> "type":"ALTER"
> }
> }