You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Frank Lyaruu <fl...@gmail.com> on 2023/03/06 13:47:39 UTC

Failing to process timestamp data from Kafka + Debezium Avro using Flink SQL

Hi all,
I'm trying to ingest change capture data data from Kafka which contains
some timestamps.
I'm using Flink SQL, and I'm running into issues, specifically with the
created_at field.
//In postgres, it is of type 'timestamptz'.

My table definition is this:
CREATE TABLE contacts (
contact_id STRING,
first_name STRING,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'film.public.contacts',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://redpanda:8081',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'redpanda:29092',
'properties.group.id' = 'analytics'
);

And the data looks something like this:
....
"after": {
"film.public.contacts.Value": {
"contact_id": "51d43c3a-4c82-4418-a779-4cb0a1864fd0",
"created_at": {
"string": "2023-03-06T11:00:17.447018Z"
},
"first_name": "hank"
}
},
"before": {
"film.public.contacts.Value": {
"contact_id": "51d43c3a-4c82-4418-a779-4cb0a1864fd0",
"created_at": {
"string": "2023-03-06T11:00:17.447018Z"
},
"first_name": "bart"
}
},
"op": "u",
"source": {
"connector": "postgresql"
}
....
It looks like the data is in microseconds, which would be timestamptz(6),
which I've heard isn't supported (issue:
https://issues.apache.org/jira/browse/FLINK-23589)
For my use case the microsecond precision isn't a big deal, I'd just like
to be able to parse it at all.

Right now I'm getting this exception: 'Caused by:
org.apache.avro.AvroTypeException: Found string, expecting union'

Any known workarounds? Can I parse it 'manually' using a UDF? If I omit the
created_at field in my query it works fine, but I do need them.

regards, Frank