You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2021/01/28 04:34:00 UTC

[jira] [Commented] (FLINK-21172) canal-json format include es field

    [ https://issues.apache.org/jira/browse/FLINK-21172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273299#comment-17273299 ] 

Jark Wu commented on FLINK-21172:
---------------------------------

We can support this by a new metadata column. What do you think [~nicholasjiang]?

> canal-json format include es field
> ----------------------------------
>
>                 Key: FLINK-21172
>                 URL: https://issues.apache.org/jira/browse/FLINK-21172
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.12.0, 1.12.1
>            Reporter: jiabao sun
>            Priority: Minor
>
> Canal flat message json format has an 'es' field extracted from mysql binlog which means the row data real change time in mysql. It expressed the event time naturally but was ignored during deserialization.
> {code:json}
> {
>   "data": [
>     {
>       "id": "111",
>       "name": "scooter",
>       "description": "Big 2-wheel scooter",
>       "weight": "5.18"
>     }
>   ],
>   "database": "inventory",
>   "es": 1589373560000,
>   "id": 9,
>   "isDdl": false,
>   "mysqlType": {
>     "id": "INTEGER",
>     "name": "VARCHAR(255)",
>     "description": "VARCHAR(512)",
>     "weight": "FLOAT"
>   },
>   "old": [
>     {
>       "weight": "5.15"
>     }
>   ],
>   "pkNames": [
>     "id"
>   ],
>   "sql": "",
>   "sqlType": {
>     "id": 4,
>     "name": 12,
>     "description": 12,
>     "weight": 7
>   },
>   "table": "products",
>   "ts": 1589373560798,
>   "type": "UPDATE"
> }
> {code}
> org.apache.flink.formats.json.canal. CanalJsonDeserializationSchema
> {code:java}
>     private static RowType createJsonRowType(DataType databaseSchema) {
>         // Canal JSON contains other information, e.g. "ts", "sql", but we don't need them
>         return (RowType)
>                 DataTypes.ROW(
>                                 DataTypes.FIELD("data", DataTypes.ARRAY(databaseSchema)),
>                                 DataTypes.FIELD("old", DataTypes.ARRAY(databaseSchema)),
>                                 DataTypes.FIELD("type", DataTypes.STRING()),
>                                 DataTypes.FIELD("database", DataTypes.STRING()),
>                                 DataTypes.FIELD("table", DataTypes.STRING()))
>                         .getLogicalType();
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)