You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Marta Paes Moreira (Jira)" <ji...@apache.org> on 2020/08/05 06:57:00 UTC

[jira] [Comment Edited] (FLINK-18758) Support debezium data type in debezium format

    [ https://issues.apache.org/jira/browse/FLINK-18758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171297#comment-17171297 ] 

Marta Paes Moreira edited comment on FLINK-18758 at 8/5/20, 6:56 AM:
---------------------------------------------------------------------

+1, thanks for documenting this issue.

I've come across this while setting up a demo with Flink SQL+Debezium (with Postgres). In addition to making it hard to work with timestamps, it also prevents users from using the JDBC catalogs to create the changelog table (since the types diverge between the Postgres table and Kafka; I couldn't make this work with computed columns with a CAST(... AS TIMESTAMP)).

NUMERIC/DECIMAL columns were also an issue, but AFAIU this is tied to Kafka itself. As a workaround, I refrained from using DECIMAL and set "numeric.mapping": "best_fit" in the Debezium connector properties.


was (Author: morsapaes):
+1, thanks for documenting this issue.

 I've come across this while setting up a demo with Flink SQL+Debezium (with Postgres). In addition to making it hard to work with timestamps, it also prevents users from using the JDBC catalogs to create the changelog table (since the types diverge between the Postgres table and Kafka).

NUMERIC/DECIMAL columns were also an issue, but AFAIU this is tied to Kafka itself. As a workaround, I refrained from using DECIMAL and set "numeric.mapping": "best_fit" in the Debezium connector properties.

> Support debezium data type in debezium format 
> ----------------------------------------------
>
>                 Key: FLINK-18758
>                 URL: https://issues.apache.org/jira/browse/FLINK-18758
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: Leonard Xu
>            Priority: Major
>
> Currently debezium json format  wrapper the json format to serialize/deserialize the debezuim json format data.
> But debezium json format has its own data type which consists of Literal Type and Semantic Type[1], i.g. Date type in debezium is an integer which represents the number of days since epoch rather than a string with 'yyyy-MM-dd' pattern.  
> {code:java}
> {   "schema":{
>       "fields":[  
>        {            
>                 "fields":[
>                {                 
>                  "type":"int32",      //Literal Type
>                   "optional":false,
>                   "name":"io.debezium.time.Date", //semantic Type
>                   "version":1,
>                   "field":"order_date"
>                },
>                {                  "type":"int32",
>                   "optional":false,
>                   "field":"quantity"
>                }
>             ]
>          }
>       ]
>    },
>    "payload":{ 
>       "before":null,
>       "after":{ 
>          "order_date":16852, // Literal Value, the number of days since epoch
>         "quantity":1
>       },
>       "op":"c",
>       "ts_ms":1596081813474
>    }
> } {code}
>  
> I think we need obtain the debezuim data type from schema information and then serialize/deserialize the data in payload.
> [1][https://debezium.io/documentation/reference/1.2/connectors/mysql.html]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)