You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "casel.chen" <ca...@126.com> on 2023/02/09 04:03:21 UTC

flink canal json格式忽略不识别的type

不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal json格式解析时直接忽略不识别的type,例如
例1:
{"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby` varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255) DEFAULT NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255) DEFAULT NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,  `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255) DEFAULT NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255) DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}


例2:
{
    "action":"ALTER",
    "before":[],
    "bid":0,
    "data":[],
    "db":"db_test",
    "dbValType":{
        "col1":"varchar(22)",
        "col2":"varchar(22)",
        "col_pk":"varchar(22)"
    },
    "ddl":true,
    "entryType":"ROWDATA",
    "execTs":1669789188000,
    "jdbcType":{
        "col1":12,
        "col2":12,
        "col_pk":12
    },
    "pks":[],
    "schema":"db_test",
    "sendTs":1669789189533,
    "sql":"alter table table_test add col2 varchar(22) null",
    "table":"table_test",
    "tableChanges":{
        "table":{
            "columns":[
                {
                    "jdbcType":12, // jdbc 类型。
                    "name":"col1",    // 字段名称。
                    "position":0,  // 字段的顺序。
                    "typeExpression":"varchar(22)", // 类型描述。
                    "typeName":"varchar" // 类型名称。
                },
                {
                    "jdbcType":12,
                    "name":"col2",
                    "position":1, 
                    "typeExpression":"varchar(22)", 
                    "typeName":"varchar" 
                },
                {
                    "jdbcType":12, 
                    "name":"col_pk",   
                    "position":2,  
                    "typeExpression":"varchar(22)", 
                    "typeName":"varchar" 
                }
            ],
            "primaryKeyColumnNames":["col_pk"] // 主键名列表。
        },
        "type":"ALTER"
    }
}

Re: Re: flink canal json格式忽略不识别的type

Posted by Weihua Hu <hu...@gmail.com>.
Hi,
可以尝试使用: json.ignore-parse-errors[1] 来忽略解析的报错,需要注意这个参数会忽略所有的解析错误

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/#json-ignore-parse-errors

Best,
Weihua


On Mon, Feb 20, 2023 at 10:14 AM casel.chen <ca...@126.com> wrote:

> 日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-20 09:58:56,"Shengkai Fang" <fs...@gmail.com> 写道:
> >Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
> >
> >Best,
> >Shengkai
> >
> >casel.chen <ca...@126.com> 于2023年2月9日周四 12:03写道:
> >
> >> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
> >> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
> >> json格式解析时直接忽略不识别的type,例如
> >> 例1:
> >>
> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
> >> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
> >> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
> >> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp
> NOT
> >> NULL DEFAULT '0000-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
> >> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255)
> DEFAULT
> >> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255)
> DEFAULT
> >> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
> >> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255)
> DEFAULT
> >> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
> >> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
> >> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB
> DEFAULT
> >> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
> >>
> >>
> >> 例2:
> >> {
> >>     "action":"ALTER",
> >>     "before":[],
> >>     "bid":0,
> >>     "data":[],
> >>     "db":"db_test",
> >>     "dbValType":{
> >>         "col1":"varchar(22)",
> >>         "col2":"varchar(22)",
> >>         "col_pk":"varchar(22)"
> >>     },
> >>     "ddl":true,
> >>     "entryType":"ROWDATA",
> >>     "execTs":1669789188000,
> >>     "jdbcType":{
> >>         "col1":12,
> >>         "col2":12,
> >>         "col_pk":12
> >>     },
> >>     "pks":[],
> >>     "schema":"db_test",
> >>     "sendTs":1669789189533,
> >>     "sql":"alter table table_test add col2 varchar(22) null",
> >>     "table":"table_test",
> >>     "tableChanges":{
> >>         "table":{
> >>             "columns":[
> >>                 {
> >>                     "jdbcType":12, // jdbc 类型。
> >>                     "name":"col1",    // 字段名称。
> >>                     "position":0,  // 字段的顺序。
> >>                     "typeExpression":"varchar(22)", // 类型描述。
> >>                     "typeName":"varchar" // 类型名称。
> >>                 },
> >>                 {
> >>                     "jdbcType":12,
> >>                     "name":"col2",
> >>                     "position":1,
> >>                     "typeExpression":"varchar(22)",
> >>                     "typeName":"varchar"
> >>                 },
> >>                 {
> >>                     "jdbcType":12,
> >>                     "name":"col_pk",
> >>                     "position":2,
> >>                     "typeExpression":"varchar(22)",
> >>                     "typeName":"varchar"
> >>                 }
> >>             ],
> >>             "primaryKeyColumnNames":["col_pk"] // 主键名列表。
> >>         },
> >>         "type":"ALTER"
> >>     }
> >> }
>

Re:Re: flink canal json格式忽略不识别的type

Posted by "casel.chen" <ca...@126.com>.
日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了

















在 2023-02-20 09:58:56,"Shengkai Fang" <fs...@gmail.com> 写道:
>Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
>
>Best,
>Shengkai
>
>casel.chen <ca...@126.com> 于2023年2月9日周四 12:03写道:
>
>> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
>> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
>> json格式解析时直接忽略不识别的type,例如
>> 例1:
>> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
>> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
>> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
>> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT
>> NULL DEFAULT '0000-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
>> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255) DEFAULT
>> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255) DEFAULT
>> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
>> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255) DEFAULT
>> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
>> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
>> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT
>> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
>>
>>
>> 例2:
>> {
>>     "action":"ALTER",
>>     "before":[],
>>     "bid":0,
>>     "data":[],
>>     "db":"db_test",
>>     "dbValType":{
>>         "col1":"varchar(22)",
>>         "col2":"varchar(22)",
>>         "col_pk":"varchar(22)"
>>     },
>>     "ddl":true,
>>     "entryType":"ROWDATA",
>>     "execTs":1669789188000,
>>     "jdbcType":{
>>         "col1":12,
>>         "col2":12,
>>         "col_pk":12
>>     },
>>     "pks":[],
>>     "schema":"db_test",
>>     "sendTs":1669789189533,
>>     "sql":"alter table table_test add col2 varchar(22) null",
>>     "table":"table_test",
>>     "tableChanges":{
>>         "table":{
>>             "columns":[
>>                 {
>>                     "jdbcType":12, // jdbc 类型。
>>                     "name":"col1",    // 字段名称。
>>                     "position":0,  // 字段的顺序。
>>                     "typeExpression":"varchar(22)", // 类型描述。
>>                     "typeName":"varchar" // 类型名称。
>>                 },
>>                 {
>>                     "jdbcType":12,
>>                     "name":"col2",
>>                     "position":1,
>>                     "typeExpression":"varchar(22)",
>>                     "typeName":"varchar"
>>                 },
>>                 {
>>                     "jdbcType":12,
>>                     "name":"col_pk",
>>                     "position":2,
>>                     "typeExpression":"varchar(22)",
>>                     "typeName":"varchar"
>>                 }
>>             ],
>>             "primaryKeyColumnNames":["col_pk"] // 主键名列表。
>>         },
>>         "type":"ALTER"
>>     }
>> }

Re: flink canal json格式忽略不识别的type

Posted by Shengkai Fang <fs...@gmail.com>.
Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?

Best,
Shengkai

casel.chen <ca...@126.com> 于2023年2月9日周四 12:03写道:

> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
> json格式解析时直接忽略不识别的type,例如
> 例1:
> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT
> NULL DEFAULT '0000-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255) DEFAULT
> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255) DEFAULT
> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255) DEFAULT
> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT
> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
>
>
> 例2:
> {
>     "action":"ALTER",
>     "before":[],
>     "bid":0,
>     "data":[],
>     "db":"db_test",
>     "dbValType":{
>         "col1":"varchar(22)",
>         "col2":"varchar(22)",
>         "col_pk":"varchar(22)"
>     },
>     "ddl":true,
>     "entryType":"ROWDATA",
>     "execTs":1669789188000,
>     "jdbcType":{
>         "col1":12,
>         "col2":12,
>         "col_pk":12
>     },
>     "pks":[],
>     "schema":"db_test",
>     "sendTs":1669789189533,
>     "sql":"alter table table_test add col2 varchar(22) null",
>     "table":"table_test",
>     "tableChanges":{
>         "table":{
>             "columns":[
>                 {
>                     "jdbcType":12, // jdbc 类型。
>                     "name":"col1",    // 字段名称。
>                     "position":0,  // 字段的顺序。
>                     "typeExpression":"varchar(22)", // 类型描述。
>                     "typeName":"varchar" // 类型名称。
>                 },
>                 {
>                     "jdbcType":12,
>                     "name":"col2",
>                     "position":1,
>                     "typeExpression":"varchar(22)",
>                     "typeName":"varchar"
>                 },
>                 {
>                     "jdbcType":12,
>                     "name":"col_pk",
>                     "position":2,
>                     "typeExpression":"varchar(22)",
>                     "typeName":"varchar"
>                 }
>             ],
>             "primaryKeyColumnNames":["col_pk"] // 主键名列表。
>         },
>         "type":"ALTER"
>     }
> }