You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Ran Tao <ch...@gmail.com> on 2022/08/25 12:04:52 UTC

[DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

```
create table test_source(
 __test_metadata__ varchar METADATA,
 f0 varchar,
 f1 varchar,
 f2 bigint,
 ts as CURRENT_TIMESTAMP
) with(
 'connector'='test',
  ...
)
```

If we not pre define `__test_metadata__` as meta keys by implementing
listReadableMetadata, run the above sql, it will cause exception like this:

org.apache.flink.table.api.ValidationException: Invalid metadata key
'__test_metadata__' in column '__test_metadata__' of table
'default_catalog.default_database.test_source'. The DynamicTableSource
class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
supports the following metadata keys for reading:
xxx, yyy

at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)

Because the current flink metadata column must exist in results returned by
`listReadableMetadata`.  But when a certain connector adds some metadatas,
we can not use it directly unless we modify this connector code and support
it. In some situations, It can be intolerable. Can we support 'DYNAMIC
MetadataColumn'?  Its basic mechanism is not to check a column with
existing metadatas and users can define it dynamically. If a certain
connector without this metadata, the column value will return null
otherwise return it's concrete value. It has great benefits in some
scenarios.

Looking forward to your opinions.


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

Posted by Ran Tao <ch...@gmail.com>.
Hi, Jark & Timo. I'm glad to support this feature, and if you guys agree,
I'll be ready to create a FLIP, and then you guys and other developers can
review and check some specifics.

Thanks.

Jark Wu <im...@gmail.com> 于2022年8月30日周二 20:24写道:

> Thank you Ran for the explanation.
>
> The column DEFAULT is a reasonable feature and can also help in other
> cases.
> I’m fine with adding this feature.
> Do you want to prepare a FLIP for it?
>
> Best,
> Jark
>
> > 2022年8月29日 15:02,Ran Tao <ch...@gmail.com> 写道:
> >
> > Hi Jack. Timo summed it up very well. In fact, my problem is that the
> > current flink table metadata is fixed and cannot be compatible with the
> > connector's changes in metadata columns.
> > A metadata column that did not exist in the past, does exist at some
> point
> > in the future, and vice versa.
> > There is forward and backward compatibility here.
> >
> > Jark Wu <im...@gmail.com> 于2022年8月26日周五 16:28写道:
> >
> >> Hi Ran,
> >>
> >> If the metadata is from the message properties, then you can manually
> cast
> >> it to your preferred types,
> >> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS
> >> TIMESTAMP)`.
> >>
> >> If the metadata is not from the message properties, how does the
> connector
> >> know which field to convert from?
> >> Shouldn’t the connector be modified to support this new metadata column?
> >>
> >> Best,
> >> Jark
> >>
> >>
> >>
> >>> 2022年8月26日 15:30,Ran Tao <ch...@gmail.com> 写道:
> >>>
> >>> Hi, TiMo. I think using one map column in the debezium format you
> >>> illustrated above can't cover the discussed scenario.
> >>> It's not the same thing.
> >>>
> >>> Here is a debezium format example from flink docs: [1]
> >>>
> >>> ```
> >>> CREATE TABLE KafkaTable (
> >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> >> VIRTUAL,
> >>> origin_properties MAP<STRING, STRING> METADATA FROM
> >>> 'value.source.properties' VIRTUAL,
> >>> user_id BIGINT,
> >>> ) WITH (
> >>> 'connector' = 'kafka',
> >>> 'value.format' = 'debezium-json'
> >>> ...
> >>> );
> >>> ```
> >>>
> >>> *the `origin_properties` is a column used for properties. So we define
> it
> >>> at MAP *(just like you respond). But the other metadata columns have
> >> their
> >>> own data types.
> >>> e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
> >>> within one MAP<STRING, STRING> column. it's not a good idea.
> >>>
> >>> My suggestion is that if kafka above *add some new metadatas*(just for
> >>> example, kafka maybe stable, but a certain connector or middleware
> might
> >> be
> >>> developing, so its metadatas could be added or changed)
> >>> e.g. at some time, kafka added a `host_name` metadata (indicate the
> >> address
> >>> of message broker).
> >>>
> >>> We can define sql like this:
> >>> ```
> >>> CREATE TABLE KafkaTable (
> >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> >> VIRTUAL,
> >>> host_name STRING METADATA VIRTUAL DYNAMIC,
> >>> origin_properties MAP<STRING, STRING> METADATA FROM
> >>> 'value.source.properties' VIRTUAL,
> >>> user_id BIGINT,
> >>> ) WITH (
> >>> 'connector' = 'kafka',
> >>> 'value.format' = 'debezium-json'
> >>> ...
> >>> );
> >>> ```
> >>> Then users can use `host_name` this metadata, because it's a DYNAMIC
> >>> metacolumn, flink dont't throw exception although `host_name`
> >>> not belongs to kafka before, and the developers don't need to modify or
> >>> rebuild flink source code and publish flink to online environment (it
> >> comes
> >>> at a high cost).
> >>>
> >>> Considering the return value:
> >>> kafka before (no this metadata): null
> >>> kafka now (added this metadata already): the concrete value
> >>>
> >>> Same user sql works well in the past and now even in the future rather
> >> than
> >>> check and deny these new metadata columns or modify connector
> >>> implementation frequently to support it.
> >>> And it's an option to configure by using 'DYNAMIC' at the metadata
> >>> column(or other better implementations).
> >>>
> >>> [1]
> >>>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
> >>>
> >>> Timo Walther <tw...@apache.org> 于2022年8月25日周四 21:07写道:
> >>>
> >>>> Hi Ran,
> >>>>
> >>>> what would be the data type of this dynamic metadata column? The
> planner
> >>>> and many parts of the stack will require a data type.
> >>>>
> >>>> Personally, I feel connector developers can already have the same
> >>>> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
> >>>> This is what we expose already as `debezium.source.properties`.
> Whatever
> >>>> Debezium adds will be available through this property and can be
> >>>> accessed via `SELECT col['my-new-property'] FROM x` including being
> NULL
> >>>> be default if not present.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 25.08.22 14:04, Ran Tao wrote:
> >>>>> ```
> >>>>> create table test_source(
> >>>>> __test_metadata__ varchar METADATA,
> >>>>> f0 varchar,
> >>>>> f1 varchar,
> >>>>> f2 bigint,
> >>>>> ts as CURRENT_TIMESTAMP
> >>>>> ) with(
> >>>>> 'connector'='test',
> >>>>>  ...
> >>>>> )
> >>>>> ```
> >>>>>
> >>>>> If we not pre define `__test_metadata__` as meta keys by implementing
> >>>>> listReadableMetadata, run the above sql, it will cause exception like
> >>>> this:
> >>>>>
> >>>>> org.apache.flink.table.api.ValidationException: Invalid metadata key
> >>>>> '__test_metadata__' in column '__test_metadata__' of table
> >>>>> 'default_catalog.default_database.test_source'. The
> DynamicTableSource
> >>>>> class
> 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
> >>>>> supports the following metadata keys for reading:
> >>>>> xxx, yyy
> >>>>>
> >>>>> at
> >>>>>
> >>>>
> >>
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
> >>>>>
> >>>>> Because the current flink metadata column must exist in results
> >> returned
> >>>> by
> >>>>> `listReadableMetadata`.  But when a certain connector adds some
> >>>> metadatas,
> >>>>> we can not use it directly unless we modify this connector code and
> >>>> support
> >>>>> it. In some situations, It can be intolerable. Can we support
> 'DYNAMIC
> >>>>> MetadataColumn'?  Its basic mechanism is not to check a column with
> >>>>> existing metadatas and users can define it dynamically. If a certain
> >>>>> connector without this metadata, the column value will return null
> >>>>> otherwise return it's concrete value. It has great benefits in some
> >>>>> scenarios.
> >>>>>
> >>>>> Looking forward to your opinions.
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>> --
> >>> Best Regards,
> >>> Ran Tao
> >>> https://github.com/chucheng92
> >>
> >>
> >
> > --
> > Best Regards,
> > Ran Tao
> > https://github.com/chucheng92
>
>

-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

Posted by Jark Wu <im...@gmail.com>.
Thank you Ran for the explanation.

The column DEFAULT is a reasonable feature and can also help in other cases. 
I’m fine with adding this feature. 
Do you want to prepare a FLIP for it?

Best,
Jark

> 2022年8月29日 15:02,Ran Tao <ch...@gmail.com> 写道:
> 
> Hi Jack. Timo summed it up very well. In fact, my problem is that the
> current flink table metadata is fixed and cannot be compatible with the
> connector's changes in metadata columns.
> A metadata column that did not exist in the past, does exist at some point
> in the future, and vice versa.
> There is forward and backward compatibility here.
> 
> Jark Wu <im...@gmail.com> 于2022年8月26日周五 16:28写道:
> 
>> Hi Ran,
>> 
>> If the metadata is from the message properties, then you can manually cast
>> it to your preferred types,
>> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS
>> TIMESTAMP)`.
>> 
>> If the metadata is not from the message properties, how does the connector
>> know which field to convert from?
>> Shouldn’t the connector be modified to support this new metadata column?
>> 
>> Best,
>> Jark
>> 
>> 
>> 
>>> 2022年8月26日 15:30,Ran Tao <ch...@gmail.com> 写道:
>>> 
>>> Hi, TiMo. I think using one map column in the debezium format you
>>> illustrated above can't cover the discussed scenario.
>>> It's not the same thing.
>>> 
>>> Here is a debezium format example from flink docs: [1]
>>> 
>>> ```
>>> CREATE TABLE KafkaTable (
>>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
>> VIRTUAL,
>>> origin_properties MAP<STRING, STRING> METADATA FROM
>>> 'value.source.properties' VIRTUAL,
>>> user_id BIGINT,
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'value.format' = 'debezium-json'
>>> ...
>>> );
>>> ```
>>> 
>>> *the `origin_properties` is a column used for properties. So we define it
>>> at MAP *(just like you respond). But the other metadata columns have
>> their
>>> own data types.
>>> e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
>>> within one MAP<STRING, STRING> column. it's not a good idea.
>>> 
>>> My suggestion is that if kafka above *add some new metadatas*(just for
>>> example, kafka maybe stable, but a certain connector or middleware might
>> be
>>> developing, so its metadatas could be added or changed)
>>> e.g. at some time, kafka added a `host_name` metadata (indicate the
>> address
>>> of message broker).
>>> 
>>> We can define sql like this:
>>> ```
>>> CREATE TABLE KafkaTable (
>>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
>> VIRTUAL,
>>> host_name STRING METADATA VIRTUAL DYNAMIC,
>>> origin_properties MAP<STRING, STRING> METADATA FROM
>>> 'value.source.properties' VIRTUAL,
>>> user_id BIGINT,
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'value.format' = 'debezium-json'
>>> ...
>>> );
>>> ```
>>> Then users can use `host_name` this metadata, because it's a DYNAMIC
>>> metacolumn, flink dont't throw exception although `host_name`
>>> not belongs to kafka before, and the developers don't need to modify or
>>> rebuild flink source code and publish flink to online environment (it
>> comes
>>> at a high cost).
>>> 
>>> Considering the return value:
>>> kafka before (no this metadata): null
>>> kafka now (added this metadata already): the concrete value
>>> 
>>> Same user sql works well in the past and now even in the future rather
>> than
>>> check and deny these new metadata columns or modify connector
>>> implementation frequently to support it.
>>> And it's an option to configure by using 'DYNAMIC' at the metadata
>>> column(or other better implementations).
>>> 
>>> [1]
>>> 
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
>>> 
>>> Timo Walther <tw...@apache.org> 于2022年8月25日周四 21:07写道:
>>> 
>>>> Hi Ran,
>>>> 
>>>> what would be the data type of this dynamic metadata column? The planner
>>>> and many parts of the stack will require a data type.
>>>> 
>>>> Personally, I feel connector developers can already have the same
>>>> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
>>>> This is what we expose already as `debezium.source.properties`. Whatever
>>>> Debezium adds will be available through this property and can be
>>>> accessed via `SELECT col['my-new-property'] FROM x` including being NULL
>>>> be default if not present.
>>>> 
>>>> Regards,
>>>> Timo
>>>> 
>>>> 
>>>> On 25.08.22 14:04, Ran Tao wrote:
>>>>> ```
>>>>> create table test_source(
>>>>> __test_metadata__ varchar METADATA,
>>>>> f0 varchar,
>>>>> f1 varchar,
>>>>> f2 bigint,
>>>>> ts as CURRENT_TIMESTAMP
>>>>> ) with(
>>>>> 'connector'='test',
>>>>>  ...
>>>>> )
>>>>> ```
>>>>> 
>>>>> If we not pre define `__test_metadata__` as meta keys by implementing
>>>>> listReadableMetadata, run the above sql, it will cause exception like
>>>> this:
>>>>> 
>>>>> org.apache.flink.table.api.ValidationException: Invalid metadata key
>>>>> '__test_metadata__' in column '__test_metadata__' of table
>>>>> 'default_catalog.default_database.test_source'. The DynamicTableSource
>>>>> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
>>>>> supports the following metadata keys for reading:
>>>>> xxx, yyy
>>>>> 
>>>>> at
>>>>> 
>>>> 
>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
>>>>> 
>>>>> Because the current flink metadata column must exist in results
>> returned
>>>> by
>>>>> `listReadableMetadata`.  But when a certain connector adds some
>>>> metadatas,
>>>>> we can not use it directly unless we modify this connector code and
>>>> support
>>>>> it. In some situations, It can be intolerable. Can we support 'DYNAMIC
>>>>> MetadataColumn'?  Its basic mechanism is not to check a column with
>>>>> existing metadatas and users can define it dynamically. If a certain
>>>>> connector without this metadata, the column value will return null
>>>>> otherwise return it's concrete value. It has great benefits in some
>>>>> scenarios.
>>>>> 
>>>>> Looking forward to your opinions.
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> --
>>> Best Regards,
>>> Ran Tao
>>> https://github.com/chucheng92
>> 
>> 
> 
> -- 
> Best Regards,
> Ran Tao
> https://github.com/chucheng92


Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

Posted by Ran Tao <ch...@gmail.com>.
Hi Jack. Timo summed it up very well. In fact, my problem is that the
current flink table metadata is fixed and cannot be compatible with the
connector's changes in metadata columns.
A metadata column that did not exist in the past, does exist at some point
in the future, and vice versa.
There is forward and backward compatibility here.

Jark Wu <im...@gmail.com> 于2022年8月26日周五 16:28写道:

> Hi Ran,
>
> If the metadata is from the message properties, then you can manually cast
> it to your preferred types,
> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS
> TIMESTAMP)`.
>
> If the metadata is not from the message properties, how does the connector
> know which field to convert from?
> Shouldn’t the connector be modified to support this new metadata column?
>
> Best,
> Jark
>
>
>
> > 2022年8月26日 15:30,Ran Tao <ch...@gmail.com> 写道:
> >
> > Hi, TiMo. I think using one map column in the debezium format you
> > illustrated above can't cover the discussed scenario.
> > It's not the same thing.
> >
> > Here is a debezium format example from flink docs: [1]
> >
> > ```
> > CREATE TABLE KafkaTable (
> >  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> VIRTUAL,
> >  origin_properties MAP<STRING, STRING> METADATA FROM
> > 'value.source.properties' VIRTUAL,
> >  user_id BIGINT,
> > ) WITH (
> >  'connector' = 'kafka',
> >  'value.format' = 'debezium-json'
> >  ...
> > );
> > ```
> >
> > *the `origin_properties` is a column used for properties. So we define it
> > at MAP *(just like you respond). But the other metadata columns have
> their
> > own data types.
> > e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
> > within one MAP<STRING, STRING> column. it's not a good idea.
> >
> > My suggestion is that if kafka above *add some new metadatas*(just for
> > example, kafka maybe stable, but a certain connector or middleware might
> be
> > developing, so its metadatas could be added or changed)
> > e.g. at some time, kafka added a `host_name` metadata (indicate the
> address
> > of message broker).
> >
> > We can define sql like this:
> > ```
> > CREATE TABLE KafkaTable (
> >  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> VIRTUAL,
> >  host_name STRING METADATA VIRTUAL DYNAMIC,
> >  origin_properties MAP<STRING, STRING> METADATA FROM
> > 'value.source.properties' VIRTUAL,
> >  user_id BIGINT,
> > ) WITH (
> >  'connector' = 'kafka',
> >  'value.format' = 'debezium-json'
> >  ...
> > );
> > ```
> > Then users can use `host_name` this metadata, because it's a DYNAMIC
> > metacolumn, flink dont't throw exception although `host_name`
> > not belongs to kafka before, and the developers don't need to modify or
> > rebuild flink source code and publish flink to online environment (it
> comes
> > at a high cost).
> >
> > Considering the return value:
> > kafka before (no this metadata): null
> > kafka now (added this metadata already): the concrete value
> >
> > Same user sql works well in the past and now even in the future rather
> than
> > check and deny these new metadata columns or modify connector
> > implementation frequently to support it.
> > And it's an option to configure by using 'DYNAMIC' at the metadata
> > column(or other better implementations).
> >
> > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
> >
> > Timo Walther <tw...@apache.org> 于2022年8月25日周四 21:07写道:
> >
> >> Hi Ran,
> >>
> >> what would be the data type of this dynamic metadata column? The planner
> >> and many parts of the stack will require a data type.
> >>
> >> Personally, I feel connector developers can already have the same
> >> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
> >> This is what we expose already as `debezium.source.properties`. Whatever
> >> Debezium adds will be available through this property and can be
> >> accessed via `SELECT col['my-new-property'] FROM x` including being NULL
> >> be default if not present.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 25.08.22 14:04, Ran Tao wrote:
> >>> ```
> >>> create table test_source(
> >>>  __test_metadata__ varchar METADATA,
> >>>  f0 varchar,
> >>>  f1 varchar,
> >>>  f2 bigint,
> >>>  ts as CURRENT_TIMESTAMP
> >>> ) with(
> >>>  'connector'='test',
> >>>   ...
> >>> )
> >>> ```
> >>>
> >>> If we not pre define `__test_metadata__` as meta keys by implementing
> >>> listReadableMetadata, run the above sql, it will cause exception like
> >> this:
> >>>
> >>> org.apache.flink.table.api.ValidationException: Invalid metadata key
> >>> '__test_metadata__' in column '__test_metadata__' of table
> >>> 'default_catalog.default_database.test_source'. The DynamicTableSource
> >>> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
> >>> supports the following metadata keys for reading:
> >>> xxx, yyy
> >>>
> >>> at
> >>>
> >>
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
> >>>
> >>> Because the current flink metadata column must exist in results
> returned
> >> by
> >>> `listReadableMetadata`.  But when a certain connector adds some
> >> metadatas,
> >>> we can not use it directly unless we modify this connector code and
> >> support
> >>> it. In some situations, It can be intolerable. Can we support 'DYNAMIC
> >>> MetadataColumn'?  Its basic mechanism is not to check a column with
> >>> existing metadatas and users can define it dynamically. If a certain
> >>> connector without this metadata, the column value will return null
> >>> otherwise return it's concrete value. It has great benefits in some
> >>> scenarios.
> >>>
> >>> Looking forward to your opinions.
> >>>
> >>>
> >>
> >>
> >
> > --
> > Best Regards,
> > Ran Tao
> > https://github.com/chucheng92
>
>

-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

Posted by Ran Tao <ch...@gmail.com>.
Your understanding is correct. In fact, my question is very simple, that
is, the metadatas of the flink table is now fixed and cannot be compatible
with the changes of the connector.

What you said about forward compatibility and backward compatibility is
very accurate, the 'DEFAULT' constraint is indeed a good way, and the form
of sql is very similar to mysql's ddl default.

It is indeed a more general approach. So can you evaluate this requirement?
If so, can flink support it?

Timo Walther <tw...@apache.org> 于2022年8月26日周五 16:46写道:

> Hi Ran,
>
> so if I understand it correctly, the problem here is not only backward
> compatibility but also forward compatibility. You might run different
> versions of your connector some of them offer a metadata key A and some
> don't offer it yet. But the DDL should work for both connector
> implementations, right?
>
> What I could imagine here is that we implement the DEFAULT constraint. A
> DDL could then look like this:
>
> CREATE TABLE x (
>    col_s0 STRING METADATA DEFAULT NULL,
>    col_s1 STRING METADATA DEFAULT "unknown",
>    col_s2 STRING DEFAULT "unknown",
> )
>
> A first version could only support metadata columns. But it would be
> more consistent to implement it for all columns types from the very
> beginning. col_s0 is a special case.
>
> What do you think?
>
> Regards,
> Timo
>
>
> On 26.08.22 10:28, Jark Wu wrote:
> > Hi Ran,
> >
> > If the metadata is from the message properties, then you can manually
> cast it to your preferred types,
> > such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS
> TIMESTAMP)`.
> >
> > If the metadata is not from the message properties, how does the
> connector know which field to convert from?
> > Shouldn’t the connector be modified to support this new metadata column?
> >
> > Best,
> > Jark
> >
> >
> >
> >> 2022年8月26日 15:30,Ran Tao <ch...@gmail.com> 写道:
> >>
> >> Hi, TiMo. I think using one map column in the debezium format you
> >> illustrated above can't cover the discussed scenario.
> >> It's not the same thing.
> >>
> >> Here is a debezium format example from flink docs: [1]
> >>
> >> ```
> >> CREATE TABLE KafkaTable (
> >>   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> VIRTUAL,
> >>   origin_properties MAP<STRING, STRING> METADATA FROM
> >> 'value.source.properties' VIRTUAL,
> >>   user_id BIGINT,
> >> ) WITH (
> >>   'connector' = 'kafka',
> >>   'value.format' = 'debezium-json'
> >>   ...
> >> );
> >> ```
> >>
> >> *the `origin_properties` is a column used for properties. So we define
> it
> >> at MAP *(just like you respond). But the other metadata columns have
> their
> >> own data types.
> >> e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
> >> within one MAP<STRING, STRING> column. it's not a good idea.
> >>
> >> My suggestion is that if kafka above *add some new metadatas*(just for
> >> example, kafka maybe stable, but a certain connector or middleware
> might be
> >> developing, so its metadatas could be added or changed)
> >> e.g. at some time, kafka added a `host_name` metadata (indicate the
> address
> >> of message broker).
> >>
> >> We can define sql like this:
> >> ```
> >> CREATE TABLE KafkaTable (
> >>   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> VIRTUAL,
> >>   host_name STRING METADATA VIRTUAL DYNAMIC,
> >>   origin_properties MAP<STRING, STRING> METADATA FROM
> >> 'value.source.properties' VIRTUAL,
> >>   user_id BIGINT,
> >> ) WITH (
> >>   'connector' = 'kafka',
> >>   'value.format' = 'debezium-json'
> >>   ...
> >> );
> >> ```
> >> Then users can use `host_name` this metadata, because it's a DYNAMIC
> >> metacolumn, flink dont't throw exception although `host_name`
> >> not belongs to kafka before, and the developers don't need to modify or
> >> rebuild flink source code and publish flink to online environment (it
> comes
> >> at a high cost).
> >>
> >> Considering the return value:
> >> kafka before (no this metadata): null
> >> kafka now (added this metadata already): the concrete value
> >>
> >> Same user sql works well in the past and now even in the future rather
> than
> >> check and deny these new metadata columns or modify connector
> >> implementation frequently to support it.
> >> And it's an option to configure by using 'DYNAMIC' at the metadata
> >> column(or other better implementations).
> >>
> >> [1]
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
> >>
> >> Timo Walther <tw...@apache.org> 于2022年8月25日周四 21:07写道:
> >>
> >>> Hi Ran,
> >>>
> >>> what would be the data type of this dynamic metadata column? The
> planner
> >>> and many parts of the stack will require a data type.
> >>>
> >>> Personally, I feel connector developers can already have the same
> >>> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
> >>> This is what we expose already as `debezium.source.properties`.
> Whatever
> >>> Debezium adds will be available through this property and can be
> >>> accessed via `SELECT col['my-new-property'] FROM x` including being
> NULL
> >>> be default if not present.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 25.08.22 14:04, Ran Tao wrote:
> >>>> ```
> >>>> create table test_source(
> >>>>   __test_metadata__ varchar METADATA,
> >>>>   f0 varchar,
> >>>>   f1 varchar,
> >>>>   f2 bigint,
> >>>>   ts as CURRENT_TIMESTAMP
> >>>> ) with(
> >>>>   'connector'='test',
> >>>>    ...
> >>>> )
> >>>> ```
> >>>>
> >>>> If we not pre define `__test_metadata__` as meta keys by implementing
> >>>> listReadableMetadata, run the above sql, it will cause exception like
> >>> this:
> >>>>
> >>>> org.apache.flink.table.api.ValidationException: Invalid metadata key
> >>>> '__test_metadata__' in column '__test_metadata__' of table
> >>>> 'default_catalog.default_database.test_source'. The DynamicTableSource
> >>>> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
> >>>> supports the following metadata keys for reading:
> >>>> xxx, yyy
> >>>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
> >>>>
> >>>> Because the current flink metadata column must exist in results
> returned
> >>> by
> >>>> `listReadableMetadata`.  But when a certain connector adds some
> >>> metadatas,
> >>>> we can not use it directly unless we modify this connector code and
> >>> support
> >>>> it. In some situations, It can be intolerable. Can we support 'DYNAMIC
> >>>> MetadataColumn'?  Its basic mechanism is not to check a column with
> >>>> existing metadatas and users can define it dynamically. If a certain
> >>>> connector without this metadata, the column value will return null
> >>>> otherwise return it's concrete value. It has great benefits in some
> >>>> scenarios.
> >>>>
> >>>> Looking forward to your opinions.
> >>>>
> >>>>
> >>>
> >>>
> >>
> >> --
> >> Best Regards,
> >> Ran Tao
> >> https://github.com/chucheng92
> >
>
>

-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

Posted by Timo Walther <tw...@apache.org>.
Hi Ran,

so if I understand it correctly, the problem here is not only backward 
compatibility but also forward compatibility. You might run different 
versions of your connector some of them offer a metadata key A and some 
don't offer it yet. But the DDL should work for both connector 
implementations, right?

What I could imagine here is that we implement the DEFAULT constraint. A 
DDL could then look like this:

CREATE TABLE x (
   col_s0 STRING METADATA DEFAULT NULL,
   col_s1 STRING METADATA DEFAULT "unknown",
   col_s2 STRING DEFAULT "unknown",
)

A first version could only support metadata columns. But it would be 
more consistent to implement it for all columns types from the very 
beginning. col_s0 is a special case.

What do you think?

Regards,
Timo


On 26.08.22 10:28, Jark Wu wrote:
> Hi Ran,
> 
> If the metadata is from the message properties, then you can manually cast it to your preferred types,
> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS TIMESTAMP)`.
> 
> If the metadata is not from the message properties, how does the connector know which field to convert from?
> Shouldn’t the connector be modified to support this new metadata column?
> 
> Best,
> Jark
> 
> 
> 
>> 2022年8月26日 15:30,Ran Tao <ch...@gmail.com> 写道:
>>
>> Hi, TiMo. I think using one map column in the debezium format you
>> illustrated above can't cover the discussed scenario.
>> It's not the same thing.
>>
>> Here is a debezium format example from flink docs: [1]
>>
>> ```
>> CREATE TABLE KafkaTable (
>>   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>>   origin_properties MAP<STRING, STRING> METADATA FROM
>> 'value.source.properties' VIRTUAL,
>>   user_id BIGINT,
>> ) WITH (
>>   'connector' = 'kafka',
>>   'value.format' = 'debezium-json'
>>   ...
>> );
>> ```
>>
>> *the `origin_properties` is a column used for properties. So we define it
>> at MAP *(just like you respond). But the other metadata columns have their
>> own data types.
>> e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
>> within one MAP<STRING, STRING> column. it's not a good idea.
>>
>> My suggestion is that if kafka above *add some new metadatas*(just for
>> example, kafka maybe stable, but a certain connector or middleware might be
>> developing, so its metadatas could be added or changed)
>> e.g. at some time, kafka added a `host_name` metadata (indicate the address
>> of message broker).
>>
>> We can define sql like this:
>> ```
>> CREATE TABLE KafkaTable (
>>   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>>   host_name STRING METADATA VIRTUAL DYNAMIC,
>>   origin_properties MAP<STRING, STRING> METADATA FROM
>> 'value.source.properties' VIRTUAL,
>>   user_id BIGINT,
>> ) WITH (
>>   'connector' = 'kafka',
>>   'value.format' = 'debezium-json'
>>   ...
>> );
>> ```
>> Then users can use `host_name` this metadata, because it's a DYNAMIC
>> metacolumn, flink dont't throw exception although `host_name`
>> not belongs to kafka before, and the developers don't need to modify or
>> rebuild flink source code and publish flink to online environment (it comes
>> at a high cost).
>>
>> Considering the return value:
>> kafka before (no this metadata): null
>> kafka now (added this metadata already): the concrete value
>>
>> Same user sql works well in the past and now even in the future rather than
>> check and deny these new metadata columns or modify connector
>> implementation frequently to support it.
>> And it's an option to configure by using 'DYNAMIC' at the metadata
>> column(or other better implementations).
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
>>
>> Timo Walther <tw...@apache.org> 于2022年8月25日周四 21:07写道:
>>
>>> Hi Ran,
>>>
>>> what would be the data type of this dynamic metadata column? The planner
>>> and many parts of the stack will require a data type.
>>>
>>> Personally, I feel connector developers can already have the same
>>> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
>>> This is what we expose already as `debezium.source.properties`. Whatever
>>> Debezium adds will be available through this property and can be
>>> accessed via `SELECT col['my-new-property'] FROM x` including being NULL
>>> be default if not present.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 25.08.22 14:04, Ran Tao wrote:
>>>> ```
>>>> create table test_source(
>>>>   __test_metadata__ varchar METADATA,
>>>>   f0 varchar,
>>>>   f1 varchar,
>>>>   f2 bigint,
>>>>   ts as CURRENT_TIMESTAMP
>>>> ) with(
>>>>   'connector'='test',
>>>>    ...
>>>> )
>>>> ```
>>>>
>>>> If we not pre define `__test_metadata__` as meta keys by implementing
>>>> listReadableMetadata, run the above sql, it will cause exception like
>>> this:
>>>>
>>>> org.apache.flink.table.api.ValidationException: Invalid metadata key
>>>> '__test_metadata__' in column '__test_metadata__' of table
>>>> 'default_catalog.default_database.test_source'. The DynamicTableSource
>>>> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
>>>> supports the following metadata keys for reading:
>>>> xxx, yyy
>>>>
>>>> at
>>>>
>>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
>>>>
>>>> Because the current flink metadata column must exist in results returned
>>> by
>>>> `listReadableMetadata`.  But when a certain connector adds some
>>> metadatas,
>>>> we can not use it directly unless we modify this connector code and
>>> support
>>>> it. In some situations, It can be intolerable. Can we support 'DYNAMIC
>>>> MetadataColumn'?  Its basic mechanism is not to check a column with
>>>> existing metadatas and users can define it dynamically. If a certain
>>>> connector without this metadata, the column value will return null
>>>> otherwise return it's concrete value. It has great benefits in some
>>>> scenarios.
>>>>
>>>> Looking forward to your opinions.
>>>>
>>>>
>>>
>>>
>>
>> -- 
>> Best Regards,
>> Ran Tao
>> https://github.com/chucheng92
> 


Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

Posted by Jark Wu <im...@gmail.com>.
Hi Ran,

If the metadata is from the message properties, then you can manually cast it to your preferred types, 
such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS TIMESTAMP)`.

If the metadata is not from the message properties, how does the connector know which field to convert from? 
Shouldn’t the connector be modified to support this new metadata column?

Best,
Jark



> 2022年8月26日 15:30,Ran Tao <ch...@gmail.com> 写道:
> 
> Hi, TiMo. I think using one map column in the debezium format you
> illustrated above can't cover the discussed scenario.
> It's not the same thing.
> 
> Here is a debezium format example from flink docs: [1]
> 
> ```
> CREATE TABLE KafkaTable (
>  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>  origin_properties MAP<STRING, STRING> METADATA FROM
> 'value.source.properties' VIRTUAL,
>  user_id BIGINT,
> ) WITH (
>  'connector' = 'kafka',
>  'value.format' = 'debezium-json'
>  ...
> );
> ```
> 
> *the `origin_properties` is a column used for properties. So we define it
> at MAP *(just like you respond). But the other metadata columns have their
> own data types.
> e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
> within one MAP<STRING, STRING> column. it's not a good idea.
> 
> My suggestion is that if kafka above *add some new metadatas*(just for
> example, kafka maybe stable, but a certain connector or middleware might be
> developing, so its metadatas could be added or changed)
> e.g. at some time, kafka added a `host_name` metadata (indicate the address
> of message broker).
> 
> We can define sql like this:
> ```
> CREATE TABLE KafkaTable (
>  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>  host_name STRING METADATA VIRTUAL DYNAMIC,
>  origin_properties MAP<STRING, STRING> METADATA FROM
> 'value.source.properties' VIRTUAL,
>  user_id BIGINT,
> ) WITH (
>  'connector' = 'kafka',
>  'value.format' = 'debezium-json'
>  ...
> );
> ```
> Then users can use `host_name` this metadata, because it's a DYNAMIC
> metacolumn, flink dont't throw exception although `host_name`
> not belongs to kafka before, and the developers don't need to modify or
> rebuild flink source code and publish flink to online environment (it comes
> at a high cost).
> 
> Considering the return value:
> kafka before (no this metadata): null
> kafka now (added this metadata already): the concrete value
> 
> Same user sql works well in the past and now even in the future rather than
> check and deny these new metadata columns or modify connector
> implementation frequently to support it.
> And it's an option to configure by using 'DYNAMIC' at the metadata
> column(or other better implementations).
> 
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
> 
> Timo Walther <tw...@apache.org> 于2022年8月25日周四 21:07写道:
> 
>> Hi Ran,
>> 
>> what would be the data type of this dynamic metadata column? The planner
>> and many parts of the stack will require a data type.
>> 
>> Personally, I feel connector developers can already have the same
>> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
>> This is what we expose already as `debezium.source.properties`. Whatever
>> Debezium adds will be available through this property and can be
>> accessed via `SELECT col['my-new-property'] FROM x` including being NULL
>> be default if not present.
>> 
>> Regards,
>> Timo
>> 
>> 
>> On 25.08.22 14:04, Ran Tao wrote:
>>> ```
>>> create table test_source(
>>>  __test_metadata__ varchar METADATA,
>>>  f0 varchar,
>>>  f1 varchar,
>>>  f2 bigint,
>>>  ts as CURRENT_TIMESTAMP
>>> ) with(
>>>  'connector'='test',
>>>   ...
>>> )
>>> ```
>>> 
>>> If we not pre define `__test_metadata__` as meta keys by implementing
>>> listReadableMetadata, run the above sql, it will cause exception like
>> this:
>>> 
>>> org.apache.flink.table.api.ValidationException: Invalid metadata key
>>> '__test_metadata__' in column '__test_metadata__' of table
>>> 'default_catalog.default_database.test_source'. The DynamicTableSource
>>> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
>>> supports the following metadata keys for reading:
>>> xxx, yyy
>>> 
>>> at
>>> 
>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
>>> 
>>> Because the current flink metadata column must exist in results returned
>> by
>>> `listReadableMetadata`.  But when a certain connector adds some
>> metadatas,
>>> we can not use it directly unless we modify this connector code and
>> support
>>> it. In some situations, It can be intolerable. Can we support 'DYNAMIC
>>> MetadataColumn'?  Its basic mechanism is not to check a column with
>>> existing metadatas and users can define it dynamically. If a certain
>>> connector without this metadata, the column value will return null
>>> otherwise return it's concrete value. It has great benefits in some
>>> scenarios.
>>> 
>>> Looking forward to your opinions.
>>> 
>>> 
>> 
>> 
> 
> -- 
> Best Regards,
> Ran Tao
> https://github.com/chucheng92


Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

Posted by Ran Tao <ch...@gmail.com>.
Hi, TiMo. I think using one map column in the debezium format you
illustrated above can't cover the discussed scenario.
It's not the same thing.

Here is a debezium format example from flink docs: [1]

```
CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  origin_properties MAP<STRING, STRING> METADATA FROM
'value.source.properties' VIRTUAL,
  user_id BIGINT,
) WITH (
  'connector' = 'kafka',
  'value.format' = 'debezium-json'
  ...
);
```

*the `origin_properties` is a column used for properties. So we define it
at MAP *(just like you respond). But the other metadata columns have their
own data types.
e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
within one MAP<STRING, STRING> column. it's not a good idea.

My suggestion is that if kafka above *add some new metadatas*(just for
example, kafka maybe stable, but a certain connector or middleware might be
developing, so its metadatas could be added or changed)
e.g. at some time, kafka added a `host_name` metadata (indicate the address
of message broker).

We can define sql like this:
```
CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  host_name STRING METADATA VIRTUAL DYNAMIC,
  origin_properties MAP<STRING, STRING> METADATA FROM
'value.source.properties' VIRTUAL,
  user_id BIGINT,
) WITH (
  'connector' = 'kafka',
  'value.format' = 'debezium-json'
  ...
);
```
Then users can use `host_name` this metadata, because it's a DYNAMIC
metacolumn, flink dont't throw exception although `host_name`
not belongs to kafka before, and the developers don't need to modify or
rebuild flink source code and publish flink to online environment (it comes
at a high cost).

Considering the return value:
kafka before (no this metadata): null
kafka now (added this metadata already): the concrete value

Same user sql works well in the past and now even in the future rather than
check and deny these new metadata columns or modify connector
implementation frequently to support it.
And it's an option to configure by using 'DYNAMIC' at the metadata
column(or other better implementations).

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/

Timo Walther <tw...@apache.org> 于2022年8月25日周四 21:07写道:

> Hi Ran,
>
> what would be the data type of this dynamic metadata column? The planner
> and many parts of the stack will require a data type.
>
> Personally, I feel connector developers can already have the same
> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
> This is what we expose already as `debezium.source.properties`. Whatever
> Debezium adds will be available through this property and can be
> accessed via `SELECT col['my-new-property'] FROM x` including being NULL
> be default if not present.
>
> Regards,
> Timo
>
>
> On 25.08.22 14:04, Ran Tao wrote:
> > ```
> > create table test_source(
> >   __test_metadata__ varchar METADATA,
> >   f0 varchar,
> >   f1 varchar,
> >   f2 bigint,
> >   ts as CURRENT_TIMESTAMP
> > ) with(
> >   'connector'='test',
> >    ...
> > )
> > ```
> >
> > If we not pre define `__test_metadata__` as meta keys by implementing
> > listReadableMetadata, run the above sql, it will cause exception like
> this:
> >
> > org.apache.flink.table.api.ValidationException: Invalid metadata key
> > '__test_metadata__' in column '__test_metadata__' of table
> > 'default_catalog.default_database.test_source'. The DynamicTableSource
> > class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
> > supports the following metadata keys for reading:
> > xxx, yyy
> >
> > at
> >
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
> >
> > Because the current flink metadata column must exist in results returned
> by
> > `listReadableMetadata`.  But when a certain connector adds some
> metadatas,
> > we can not use it directly unless we modify this connector code and
> support
> > it. In some situations, It can be intolerable. Can we support 'DYNAMIC
> > MetadataColumn'?  Its basic mechanism is not to check a column with
> > existing metadatas and users can define it dynamically. If a certain
> > connector without this metadata, the column value will return null
> > otherwise return it's concrete value. It has great benefits in some
> > scenarios.
> >
> > Looking forward to your opinions.
> >
> >
>
>

-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

Posted by Timo Walther <tw...@apache.org>.
Hi Ran,

what would be the data type of this dynamic metadata column? The planner 
and many parts of the stack will require a data type.

Personally, I feel connector developers can already have the same 
functionality by declaring a metadata column as `MAP<STRING, STRING>`. 
This is what we expose already as `debezium.source.properties`. Whatever 
Debezium adds will be available through this property and can be 
accessed via `SELECT col['my-new-property'] FROM x` including being NULL 
be default if not present.

Regards,
Timo


On 25.08.22 14:04, Ran Tao wrote:
> ```
> create table test_source(
>   __test_metadata__ varchar METADATA,
>   f0 varchar,
>   f1 varchar,
>   f2 bigint,
>   ts as CURRENT_TIMESTAMP
> ) with(
>   'connector'='test',
>    ...
> )
> ```
> 
> If we not pre define `__test_metadata__` as meta keys by implementing
> listReadableMetadata, run the above sql, it will cause exception like this:
> 
> org.apache.flink.table.api.ValidationException: Invalid metadata key
> '__test_metadata__' in column '__test_metadata__' of table
> 'default_catalog.default_database.test_source'. The DynamicTableSource
> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
> supports the following metadata keys for reading:
> xxx, yyy
> 
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
> 
> Because the current flink metadata column must exist in results returned by
> `listReadableMetadata`.  But when a certain connector adds some metadatas,
> we can not use it directly unless we modify this connector code and support
> it. In some situations, It can be intolerable. Can we support 'DYNAMIC
> MetadataColumn'?  Its basic mechanism is not to check a column with
> existing metadatas and users can define it dynamically. If a certain
> connector without this metadata, the column value will return null
> otherwise return it's concrete value. It has great benefits in some
> scenarios.
> 
> Looking forward to your opinions.
> 
>