You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Harshvardhan Shinde <ha...@oyorooms.com> on 2021/09/22 15:10:04 UTC

Can't access Debezium metadata fields in Kafka table

Hi,
I'm trying to access the metadata columns from the debezium source
connector as documented here
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/#available-metadata>
.
However I'm getting the following error when I try to select the rows from
the kafka table:

flink.table.api.ValidationException: Invalid metadata key
'value.ingestion-timestamp' in column 'origin_ts'

Getting the same issue for all the *virtual* columns. Please let me know
what I'm doing wrong.

Here's my table creation query:

CREATE TABLE testFlink (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  origin_properties MAP<STRING, STRING> METADATA FROM
'value.source.properties' VIRTUAL,
  id BIGINT,
  number BIGINT,
  created_at BIGINT,
  updated_at BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
  'properties.bootstrap.servers' = '<BROKER_URL>:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-avro-confluent',
  'value.debezium-avro-confluent.schema-registry.url' =
'<SCHEMA_REGISRTY>:8081'
);

Thanks.

Re: Can't access Debezium metadata fields in Kafka table

Posted by Leonard Xu <xb...@gmail.com>.
Hi,  Harshvardhan

The format debezium-avro-confluent  doesn’t support read metadata yet[1], the supported formats including debezium-json, canal-json and maxwell-json, you can try the supported formats.

Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-20454 <https://issues.apache.org/jira/browse/FLINK-20454>



> 在 2021年9月24日,14:00,Harshvardhan Shinde <ha...@oyorooms.com> 写道:
> 
> Hi,
> Here's the complete error log:
>  
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Invalid metadata key 'value.ingestion-timestamp' in column 'origin_ts' of table 'flink_hive.harsh_test.testflink'. The DynamicTableSource class 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource' supports the following metadata keys for reading:
> topic
> partition
> headers
> leader-epoch
> offset
> timestamp
> timestamp-type
> 
> I'll need some more time to test it with debezium-json format.
> 
> On Fri, Sep 24, 2021 at 12:52 AM Roman Khachatryan <roman@apache.org <ma...@apache.org>> wrote:
> Hi,
> could you please share the full error message?
> I think it should list the supported metadata columns.
> 
> Do you see the same error with 'debezium-json' format instead of
> 'debezium-avro-confluent' ?
> 
> Regards,
> Roman
> 
> 
> On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde
> <harshvardhan.shinde@oyorooms.com <ma...@oyorooms.com>> wrote:
> >
> > Hi,
> > I'm trying to access the metadata columns from the debezium source connector as documented here.
> > However I'm getting the following error when I try to select the rows from the kafka table:
> >
> > flink.table.api.ValidationException: Invalid metadata key 'value.ingestion-timestamp' in column 'origin_ts'
> >
> > Getting the same issue for all the virtual columns. Please let me know what I'm doing wrong.
> >
> > Here's my table creation query:
> >
> > CREATE TABLE testFlink (
> >   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
> >   event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
> >   origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
> >   origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
> >   origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
> >   origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
> >   id BIGINT,
> >   number BIGINT,
> >   created_at BIGINT,
> >   updated_at BIGINT
> > ) WITH (
> >   'connector' = 'kafka',
> >   'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
> >   'properties.bootstrap.servers' = '<BROKER_URL>:9092',
> >   'properties.group.id <http://properties.group.id/>' = 'testGroup',
> >   'scan.startup.mode' = 'earliest-offset',
> >   'value.format' = 'debezium-avro-confluent',
> >   'value.debezium-avro-confluent.schema-registry.url' = '<SCHEMA_REGISRTY>:8081'
> > );
> >
> > Thanks.
> 
> 
> -- 
> Thanks and Regards,
> Harshvardhan
> Data Platform


Re: Can't access Debezium metadata fields in Kafka table

Posted by Harshvardhan Shinde <ha...@oyorooms.com>.
Hi,
Here's the complete error log:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Invalid metadata key
'value.ingestion-timestamp' in column 'origin_ts' of table
'flink_hive.harsh_test.testflink'. The DynamicTableSource class
'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource'
supports the following metadata keys for reading:
topic
partition
headers
leader-epoch
offset
timestamp
timestamp-type

I'll need some more time to test it with debezium-json format.

On Fri, Sep 24, 2021 at 12:52 AM Roman Khachatryan <ro...@apache.org> wrote:

> Hi,
> could you please share the full error message?
> I think it should list the supported metadata columns.
>
> Do you see the same error with 'debezium-json' format instead of
> 'debezium-avro-confluent' ?
>
> Regards,
> Roman
>
>
> On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde
> <ha...@oyorooms.com> wrote:
> >
> > Hi,
> > I'm trying to access the metadata columns from the debezium source
> connector as documented here.
> > However I'm getting the following error when I try to select the rows
> from the kafka table:
> >
> > flink.table.api.ValidationException: Invalid metadata key
> 'value.ingestion-timestamp' in column 'origin_ts'
> >
> > Getting the same issue for all the virtual columns. Please let me know
> what I'm doing wrong.
> >
> > Here's my table creation query:
> >
> > CREATE TABLE testFlink (
> >   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> VIRTUAL,
> >   event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
> >   origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
> >   origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
> >   origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
> >   origin_properties MAP<STRING, STRING> METADATA FROM
> 'value.source.properties' VIRTUAL,
> >   id BIGINT,
> >   number BIGINT,
> >   created_at BIGINT,
> >   updated_at BIGINT
> > ) WITH (
> >   'connector' = 'kafka',
> >   'topic' =
> 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
> >   'properties.bootstrap.servers' = '<BROKER_URL>:9092',
> >   'properties.group.id' = 'testGroup',
> >   'scan.startup.mode' = 'earliest-offset',
> >   'value.format' = 'debezium-avro-confluent',
> >   'value.debezium-avro-confluent.schema-registry.url' =
> '<SCHEMA_REGISRTY>:8081'
> > );
> >
> > Thanks.
>


-- 
Thanks and Regards,
Harshvardhan
Data Platform

Re: Can't access Debezium metadata fields in Kafka table

Posted by Roman Khachatryan <ro...@apache.org>.
Hi,
could you please share the full error message?
I think it should list the supported metadata columns.

Do you see the same error with 'debezium-json' format instead of
'debezium-avro-confluent' ?

Regards,
Roman


On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde
<ha...@oyorooms.com> wrote:
>
> Hi,
> I'm trying to access the metadata columns from the debezium source connector as documented here.
> However I'm getting the following error when I try to select the rows from the kafka table:
>
> flink.table.api.ValidationException: Invalid metadata key 'value.ingestion-timestamp' in column 'origin_ts'
>
> Getting the same issue for all the virtual columns. Please let me know what I'm doing wrong.
>
> Here's my table creation query:
>
> CREATE TABLE testFlink (
>   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>   event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
>   origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
>   origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
>   origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
>   origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
>   id BIGINT,
>   number BIGINT,
>   created_at BIGINT,
>   updated_at BIGINT
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
>   'properties.bootstrap.servers' = '<BROKER_URL>:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'value.format' = 'debezium-avro-confluent',
>   'value.debezium-avro-confluent.schema-registry.url' = '<SCHEMA_REGISRTY>:8081'
> );
>
> Thanks.