You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Harshvardhan Shinde <ha...@oyorooms.com> on 2021/09/22 15:10:04 UTC
Can't access Debezium metadata fields in Kafka table
Hi,
I'm trying to access the metadata columns from the debezium source
connector as documented here
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/#available-metadata>
.
However I'm getting the following error when I try to select the rows from
the kafka table:
flink.table.api.ValidationException: Invalid metadata key
'value.ingestion-timestamp' in column 'origin_ts'
Getting the same issue for all the *virtual* columns. Please let me know
what I'm doing wrong.
Here's my table creation query:
CREATE TABLE testFlink (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
origin_properties MAP<STRING, STRING> METADATA FROM
'value.source.properties' VIRTUAL,
id BIGINT,
number BIGINT,
created_at BIGINT,
updated_at BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
'properties.bootstrap.servers' = '<BROKER_URL>:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-avro-confluent',
'value.debezium-avro-confluent.schema-registry.url' =
'<SCHEMA_REGISRTY>:8081'
);
Thanks.
Re: Can't access Debezium metadata fields in Kafka table
Posted by Leonard Xu <xb...@gmail.com>.
Hi, Harshvardhan
The format debezium-avro-confluent doesn’t support read metadata yet[1], the supported formats including debezium-json, canal-json and maxwell-json, you can try the supported formats.
Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-20454 <https://issues.apache.org/jira/browse/FLINK-20454>
> 在 2021年9月24日,14:00,Harshvardhan Shinde <ha...@oyorooms.com> 写道:
>
> Hi,
> Here's the complete error log:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Invalid metadata key 'value.ingestion-timestamp' in column 'origin_ts' of table 'flink_hive.harsh_test.testflink'. The DynamicTableSource class 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource' supports the following metadata keys for reading:
> topic
> partition
> headers
> leader-epoch
> offset
> timestamp
> timestamp-type
>
> I'll need some more time to test it with debezium-json format.
>
> On Fri, Sep 24, 2021 at 12:52 AM Roman Khachatryan <roman@apache.org <ma...@apache.org>> wrote:
> Hi,
> could you please share the full error message?
> I think it should list the supported metadata columns.
>
> Do you see the same error with 'debezium-json' format instead of
> 'debezium-avro-confluent' ?
>
> Regards,
> Roman
>
>
> On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde
> <harshvardhan.shinde@oyorooms.com <ma...@oyorooms.com>> wrote:
> >
> > Hi,
> > I'm trying to access the metadata columns from the debezium source connector as documented here.
> > However I'm getting the following error when I try to select the rows from the kafka table:
> >
> > flink.table.api.ValidationException: Invalid metadata key 'value.ingestion-timestamp' in column 'origin_ts'
> >
> > Getting the same issue for all the virtual columns. Please let me know what I'm doing wrong.
> >
> > Here's my table creation query:
> >
> > CREATE TABLE testFlink (
> > origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
> > event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
> > origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
> > origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
> > origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
> > origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
> > id BIGINT,
> > number BIGINT,
> > created_at BIGINT,
> > updated_at BIGINT
> > ) WITH (
> > 'connector' = 'kafka',
> > 'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
> > 'properties.bootstrap.servers' = '<BROKER_URL>:9092',
> > 'properties.group.id <http://properties.group.id/>' = 'testGroup',
> > 'scan.startup.mode' = 'earliest-offset',
> > 'value.format' = 'debezium-avro-confluent',
> > 'value.debezium-avro-confluent.schema-registry.url' = '<SCHEMA_REGISRTY>:8081'
> > );
> >
> > Thanks.
>
>
> --
> Thanks and Regards,
> Harshvardhan
> Data Platform
Re: Can't access Debezium metadata fields in Kafka table
Posted by Harshvardhan Shinde <ha...@oyorooms.com>.
Hi,
Here's the complete error log:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Invalid metadata key
'value.ingestion-timestamp' in column 'origin_ts' of table
'flink_hive.harsh_test.testflink'. The DynamicTableSource class
'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource'
supports the following metadata keys for reading:
topic
partition
headers
leader-epoch
offset
timestamp
timestamp-type
I'll need some more time to test it with debezium-json format.
On Fri, Sep 24, 2021 at 12:52 AM Roman Khachatryan <ro...@apache.org> wrote:
> Hi,
> could you please share the full error message?
> I think it should list the supported metadata columns.
>
> Do you see the same error with 'debezium-json' format instead of
> 'debezium-avro-confluent' ?
>
> Regards,
> Roman
>
>
> On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde
> <ha...@oyorooms.com> wrote:
> >
> > Hi,
> > I'm trying to access the metadata columns from the debezium source
> connector as documented here.
> > However I'm getting the following error when I try to select the rows
> from the kafka table:
> >
> > flink.table.api.ValidationException: Invalid metadata key
> 'value.ingestion-timestamp' in column 'origin_ts'
> >
> > Getting the same issue for all the virtual columns. Please let me know
> what I'm doing wrong.
> >
> > Here's my table creation query:
> >
> > CREATE TABLE testFlink (
> > origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> VIRTUAL,
> > event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
> > origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
> > origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
> > origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
> > origin_properties MAP<STRING, STRING> METADATA FROM
> 'value.source.properties' VIRTUAL,
> > id BIGINT,
> > number BIGINT,
> > created_at BIGINT,
> > updated_at BIGINT
> > ) WITH (
> > 'connector' = 'kafka',
> > 'topic' =
> 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
> > 'properties.bootstrap.servers' = '<BROKER_URL>:9092',
> > 'properties.group.id' = 'testGroup',
> > 'scan.startup.mode' = 'earliest-offset',
> > 'value.format' = 'debezium-avro-confluent',
> > 'value.debezium-avro-confluent.schema-registry.url' =
> '<SCHEMA_REGISRTY>:8081'
> > );
> >
> > Thanks.
>
--
Thanks and Regards,
Harshvardhan
Data Platform
Re: Can't access Debezium metadata fields in Kafka table
Posted by Roman Khachatryan <ro...@apache.org>.
Hi,
could you please share the full error message?
I think it should list the supported metadata columns.
Do you see the same error with 'debezium-json' format instead of
'debezium-avro-confluent' ?
Regards,
Roman
On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde
<ha...@oyorooms.com> wrote:
>
> Hi,
> I'm trying to access the metadata columns from the debezium source connector as documented here.
> However I'm getting the following error when I try to select the rows from the kafka table:
>
> flink.table.api.ValidationException: Invalid metadata key 'value.ingestion-timestamp' in column 'origin_ts'
>
> Getting the same issue for all the virtual columns. Please let me know what I'm doing wrong.
>
> Here's my table creation query:
>
> CREATE TABLE testFlink (
> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
> event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
> origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
> origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
> origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
> origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
> id BIGINT,
> number BIGINT,
> created_at BIGINT,
> updated_at BIGINT
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
> 'properties.bootstrap.servers' = '<BROKER_URL>:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'value.format' = 'debezium-avro-confluent',
> 'value.debezium-avro-confluent.schema-registry.url' = '<SCHEMA_REGISRTY>:8081'
> );
>
> Thanks.