You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jingsong Li <ji...@gmail.com> on 2022/04/29 09:54:20 UTC

[DISCUSS] FLIP-226: Introduce Schema Evolution on Table Store

Hi devs,

I want to start a discussion about Schema Evolution on the Flink Table
Store. [1]

In FLINK-21634, We plan to support many schema changes in Flink SQL.
But for the current Table Store, it may result in wrong data, unclear
evolutions.

In general, the user has these operations for schema:
- Add column: Adding a column to a table.
- Modify column type.
- Drop column: Drop a column.
- Rename column: For example, rename the "name_1" column to "name_2".

Another schema change is partition keys, the data is changing over
time, for example, a table with day partition, as the business
continues to grow, the new partition of the table by day will become
larger and the business wants to change to hourly partitions.

A simple approach is to rewrite all the existing data when modifying the schema.
But this expensive way is not acceptable to the user, so we need to
support and define it clearly.
Modifying the schema does not rewrite the existing data, when reading
the original data needs to evolve to the current schema.

Look forward to your feedback!

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store

Best,
Jingsong

Re: [DISCUSS] FLIP-226: Introduce Schema Evolution on Table Store

Posted by Jingsong Li <ji...@gmail.com>.
Thank Jark~

Hi all,

I just created a vote thread [1].
Feel free to -1 if you think there is something wrong with the design.

[1] https://lists.apache.org/thread/lg5txz95mgko4mp6fqcwt1dd1hbjctjy

Best,
Jingsong

On Thu, May 12, 2022 at 4:14 PM Jark Wu <im...@gmail.com> wrote:

> Thank Jingsong for the explanation. I don't have other concerns.
>
> Best,
> Jark
>
> On Thu, 12 May 2022 at 09:53, Jingsong Li <ji...@gmail.com> wrote:
>
> > Hi all,
> >
> > If there are no more comments, I'm going to start a vote.
> >
> > Best,
> > Jingsong
> >
> > On Tue, May 10, 2022 at 10:37 AM Jingsong Li <ji...@gmail.com>
> > wrote:
> >
> > > Hi Jark,
> > >
> > > Thanks for your feedback.
> > >
> > > > 1) Does table-store support evolve schemas multiple times during a
> > > checkpoint?
> > >
> > > In this case this checkpoint is split into multiple commits, e.g.:
> > > - commit1: write 1 million rows
> > > - commit1: write 1 million rows
> > > - commit2: evolve mode 1
> > > - commit3: write 1 million lines
> > > ....
> > >
> > > Some works needs to be done on the connector side.
> > >
> > > > 2) Does ADD COLUMN support add a NOT-NULL column?
> > >
> > > I tend not to support it at this time.
> > > The other strategy is to support it, but report errors when reading
> data
> > > with the new shcema, which ensures that data can be read with the old
> > > schema.
> > >
> > > > 3) What's the matrix of type evolution? Do you support modifying a
> > column
> > > to any type?
> > >
> > > For type evolution, we currently only support types that are supported
> by
> > > implicit conversions. (From Flink LogicalTypeCasts)
> > > Three modes can be supported in future to allow the user to select
> > > - Default implicit conversions
> > > - Allow implicit and explicit conversions
> > >     - Throw exceptions when cast fail.
> > >     - Return null when cast fail.
> > >
> > > I have updated FLIP.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Mon, May 9, 2022 at 8:14 PM Jark Wu <im...@gmail.com> wrote:
> > >
> > >> Thanks for proposing this exciting feature, Jingsong!
> > >>
> > >> I only have a few questions:
> > >>
> > >> 1) Does table-store support evolve schemas multiple times during a
> > >> checkpoint?
> > >> For example, cp1 -> write 1M rows (may flush file store) -> evolve
> > schema1
> > >> ->
> > >> write 1M rows (may flush file store again) -> evolve schema2 -> write
> 1M
> > >> rows -> cp2
> > >>
> > >> That means the schemas of new data files are different in this
> snapshot.
> > >> Besides, it may need to register schemas before the checkpoint is
> > >> complete.
> > >>
> > >> 2) Does ADD COLUMN support add a NOT-NULL column?
> > >>
> > >> 3) What's the matrix of type evolution? Do you support modifying a
> > column
> > >> to any type?
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >>
> > >>
> > >> On Mon, 9 May 2022 at 16:44, Caizhi Weng <ts...@gmail.com>
> wrote:
> > >>
> > >> > Hi all!
> > >> >
> > >> > +1 for this FLIP. By adding schema information into data files we
> can
> > >> not
> > >> > only support schema evolution, which is a very useful feature for
> data
> > >> > storages, but also make it easier for table store to integrate with
> > >> other
> > >> > systems.
> > >> >
> > >> > For example timestamp type in Hive does not support precision. With
> > this
> > >> > extra schema information however we can directly deduce the
> precision
> > >> of a
> > >> > schema column.
> > >> >
> > >> > Jingsong Li <ji...@gmail.com> 于2022年4月29日周五 17:54写道:
> > >> >
> > >> > > Hi devs,
> > >> > >
> > >> > > I want to start a discussion about Schema Evolution on the Flink
> > Table
> > >> > > Store. [1]
> > >> > >
> > >> > > In FLINK-21634, We plan to support many schema changes in Flink
> SQL.
> > >> > > But for the current Table Store, it may result in wrong data,
> > unclear
> > >> > > evolutions.
> > >> > >
> > >> > > In general, the user has these operations for schema:
> > >> > > - Add column: Adding a column to a table.
> > >> > > - Modify column type.
> > >> > > - Drop column: Drop a column.
> > >> > > - Rename column: For example, rename the "name_1" column to
> > "name_2".
> > >> > >
> > >> > > Another schema change is partition keys, the data is changing over
> > >> > > time, for example, a table with day partition, as the business
> > >> > > continues to grow, the new partition of the table by day will
> become
> > >> > > larger and the business wants to change to hourly partitions.
> > >> > >
> > >> > > A simple approach is to rewrite all the existing data when
> modifying
> > >> the
> > >> > > schema.
> > >> > > But this expensive way is not acceptable to the user, so we need
> to
> > >> > > support and define it clearly.
> > >> > > Modifying the schema does not rewrite the existing data, when
> > reading
> > >> > > the original data needs to evolve to the current schema.
> > >> > >
> > >> > > Look forward to your feedback!
> > >> > >
> > >> > > [1]
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store
> > >> > >
> > >> > > Best,
> > >> > > Jingsong
> > >> > >
> > >> >
> > >>
> > >
> >
>

Re: [DISCUSS] FLIP-226: Introduce Schema Evolution on Table Store

Posted by Jark Wu <im...@gmail.com>.
Thank Jingsong for the explanation. I don't have other concerns.

Best,
Jark

On Thu, 12 May 2022 at 09:53, Jingsong Li <ji...@gmail.com> wrote:

> Hi all,
>
> If there are no more comments, I'm going to start a vote.
>
> Best,
> Jingsong
>
> On Tue, May 10, 2022 at 10:37 AM Jingsong Li <ji...@gmail.com>
> wrote:
>
> > Hi Jark,
> >
> > Thanks for your feedback.
> >
> > > 1) Does table-store support evolve schemas multiple times during a
> > checkpoint?
> >
> > In this case this checkpoint is split into multiple commits, e.g.:
> > - commit1: write 1 million rows
> > - commit1: write 1 million rows
> > - commit2: evolve mode 1
> > - commit3: write 1 million lines
> > ....
> >
> > Some works needs to be done on the connector side.
> >
> > > 2) Does ADD COLUMN support add a NOT-NULL column?
> >
> > I tend not to support it at this time.
> > The other strategy is to support it, but report errors when reading data
> > with the new shcema, which ensures that data can be read with the old
> > schema.
> >
> > > 3) What's the matrix of type evolution? Do you support modifying a
> column
> > to any type?
> >
> > For type evolution, we currently only support types that are supported by
> > implicit conversions. (From Flink LogicalTypeCasts)
> > Three modes can be supported in future to allow the user to select
> > - Default implicit conversions
> > - Allow implicit and explicit conversions
> >     - Throw exceptions when cast fail.
> >     - Return null when cast fail.
> >
> > I have updated FLIP.
> >
> > Best,
> > Jingsong
> >
> > On Mon, May 9, 2022 at 8:14 PM Jark Wu <im...@gmail.com> wrote:
> >
> >> Thanks for proposing this exciting feature, Jingsong!
> >>
> >> I only have a few questions:
> >>
> >> 1) Does table-store support evolve schemas multiple times during a
> >> checkpoint?
> >> For example, cp1 -> write 1M rows (may flush file store) -> evolve
> schema1
> >> ->
> >> write 1M rows (may flush file store again) -> evolve schema2 -> write 1M
> >> rows -> cp2
> >>
> >> That means the schemas of new data files are different in this snapshot.
> >> Besides, it may need to register schemas before the checkpoint is
> >> complete.
> >>
> >> 2) Does ADD COLUMN support add a NOT-NULL column?
> >>
> >> 3) What's the matrix of type evolution? Do you support modifying a
> column
> >> to any type?
> >>
> >> Best,
> >> Jark
> >>
> >>
> >>
> >> On Mon, 9 May 2022 at 16:44, Caizhi Weng <ts...@gmail.com> wrote:
> >>
> >> > Hi all!
> >> >
> >> > +1 for this FLIP. By adding schema information into data files we can
> >> not
> >> > only support schema evolution, which is a very useful feature for data
> >> > storages, but also make it easier for table store to integrate with
> >> other
> >> > systems.
> >> >
> >> > For example timestamp type in Hive does not support precision. With
> this
> >> > extra schema information however we can directly deduce the precision
> >> of a
> >> > schema column.
> >> >
> >> > Jingsong Li <ji...@gmail.com> 于2022年4月29日周五 17:54写道:
> >> >
> >> > > Hi devs,
> >> > >
> >> > > I want to start a discussion about Schema Evolution on the Flink
> Table
> >> > > Store. [1]
> >> > >
> >> > > In FLINK-21634, We plan to support many schema changes in Flink SQL.
> >> > > But for the current Table Store, it may result in wrong data,
> unclear
> >> > > evolutions.
> >> > >
> >> > > In general, the user has these operations for schema:
> >> > > - Add column: Adding a column to a table.
> >> > > - Modify column type.
> >> > > - Drop column: Drop a column.
> >> > > - Rename column: For example, rename the "name_1" column to
> "name_2".
> >> > >
> >> > > Another schema change is partition keys, the data is changing over
> >> > > time, for example, a table with day partition, as the business
> >> > > continues to grow, the new partition of the table by day will become
> >> > > larger and the business wants to change to hourly partitions.
> >> > >
> >> > > A simple approach is to rewrite all the existing data when modifying
> >> the
> >> > > schema.
> >> > > But this expensive way is not acceptable to the user, so we need to
> >> > > support and define it clearly.
> >> > > Modifying the schema does not rewrite the existing data, when
> reading
> >> > > the original data needs to evolve to the current schema.
> >> > >
> >> > > Look forward to your feedback!
> >> > >
> >> > > [1]
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store
> >> > >
> >> > > Best,
> >> > > Jingsong
> >> > >
> >> >
> >>
> >
>

Re: [DISCUSS] FLIP-226: Introduce Schema Evolution on Table Store

Posted by Jingsong Li <ji...@gmail.com>.
Hi all,

If there are no more comments, I'm going to start a vote.

Best,
Jingsong

On Tue, May 10, 2022 at 10:37 AM Jingsong Li <ji...@gmail.com> wrote:

> Hi Jark,
>
> Thanks for your feedback.
>
> > 1) Does table-store support evolve schemas multiple times during a
> checkpoint?
>
> In this case this checkpoint is split into multiple commits, e.g.:
> - commit1: write 1 million rows
> - commit1: write 1 million rows
> - commit2: evolve mode 1
> - commit3: write 1 million lines
> ....
>
> Some works needs to be done on the connector side.
>
> > 2) Does ADD COLUMN support add a NOT-NULL column?
>
> I tend not to support it at this time.
> The other strategy is to support it, but report errors when reading data
> with the new shcema, which ensures that data can be read with the old
> schema.
>
> > 3) What's the matrix of type evolution? Do you support modifying a column
> to any type?
>
> For type evolution, we currently only support types that are supported by
> implicit conversions. (From Flink LogicalTypeCasts)
> Three modes can be supported in future to allow the user to select
> - Default implicit conversions
> - Allow implicit and explicit conversions
>     - Throw exceptions when cast fail.
>     - Return null when cast fail.
>
> I have updated FLIP.
>
> Best,
> Jingsong
>
> On Mon, May 9, 2022 at 8:14 PM Jark Wu <im...@gmail.com> wrote:
>
>> Thanks for proposing this exciting feature, Jingsong!
>>
>> I only have a few questions:
>>
>> 1) Does table-store support evolve schemas multiple times during a
>> checkpoint?
>> For example, cp1 -> write 1M rows (may flush file store) -> evolve schema1
>> ->
>> write 1M rows (may flush file store again) -> evolve schema2 -> write 1M
>> rows -> cp2
>>
>> That means the schemas of new data files are different in this snapshot.
>> Besides, it may need to register schemas before the checkpoint is
>> complete.
>>
>> 2) Does ADD COLUMN support add a NOT-NULL column?
>>
>> 3) What's the matrix of type evolution? Do you support modifying a column
>> to any type?
>>
>> Best,
>> Jark
>>
>>
>>
>> On Mon, 9 May 2022 at 16:44, Caizhi Weng <ts...@gmail.com> wrote:
>>
>> > Hi all!
>> >
>> > +1 for this FLIP. By adding schema information into data files we can
>> not
>> > only support schema evolution, which is a very useful feature for data
>> > storages, but also make it easier for table store to integrate with
>> other
>> > systems.
>> >
>> > For example timestamp type in Hive does not support precision. With this
>> > extra schema information however we can directly deduce the precision
>> of a
>> > schema column.
>> >
>> > Jingsong Li <ji...@gmail.com> 于2022年4月29日周五 17:54写道:
>> >
>> > > Hi devs,
>> > >
>> > > I want to start a discussion about Schema Evolution on the Flink Table
>> > > Store. [1]
>> > >
>> > > In FLINK-21634, We plan to support many schema changes in Flink SQL.
>> > > But for the current Table Store, it may result in wrong data, unclear
>> > > evolutions.
>> > >
>> > > In general, the user has these operations for schema:
>> > > - Add column: Adding a column to a table.
>> > > - Modify column type.
>> > > - Drop column: Drop a column.
>> > > - Rename column: For example, rename the "name_1" column to "name_2".
>> > >
>> > > Another schema change is partition keys, the data is changing over
>> > > time, for example, a table with day partition, as the business
>> > > continues to grow, the new partition of the table by day will become
>> > > larger and the business wants to change to hourly partitions.
>> > >
>> > > A simple approach is to rewrite all the existing data when modifying
>> the
>> > > schema.
>> > > But this expensive way is not acceptable to the user, so we need to
>> > > support and define it clearly.
>> > > Modifying the schema does not rewrite the existing data, when reading
>> > > the original data needs to evolve to the current schema.
>> > >
>> > > Look forward to your feedback!
>> > >
>> > > [1]
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store
>> > >
>> > > Best,
>> > > Jingsong
>> > >
>> >
>>
>

Re: [DISCUSS] FLIP-226: Introduce Schema Evolution on Table Store

Posted by Jingsong Li <ji...@gmail.com>.
Hi Jark,

Thanks for your feedback.

> 1) Does table-store support evolve schemas multiple times during a
checkpoint?

In this case this checkpoint is split into multiple commits, e.g.:
- commit1: write 1 million rows
- commit1: write 1 million rows
- commit2: evolve mode 1
- commit3: write 1 million lines
....

Some works needs to be done on the connector side.

> 2) Does ADD COLUMN support add a NOT-NULL column?

I tend not to support it at this time.
The other strategy is to support it, but report errors when reading data
with the new shcema, which ensures that data can be read with the old
schema.

> 3) What's the matrix of type evolution? Do you support modifying a column
to any type?

For type evolution, we currently only support types that are supported by
implicit conversions. (From Flink LogicalTypeCasts)
Three modes can be supported in future to allow the user to select
- Default implicit conversions
- Allow implicit and explicit conversions
    - Throw exceptions when cast fail.
    - Return null when cast fail.

I have updated FLIP.

Best,
Jingsong

On Mon, May 9, 2022 at 8:14 PM Jark Wu <im...@gmail.com> wrote:

> Thanks for proposing this exciting feature, Jingsong!
>
> I only have a few questions:
>
> 1) Does table-store support evolve schemas multiple times during a
> checkpoint?
> For example, cp1 -> write 1M rows (may flush file store) -> evolve schema1
> ->
> write 1M rows (may flush file store again) -> evolve schema2 -> write 1M
> rows -> cp2
>
> That means the schemas of new data files are different in this snapshot.
> Besides, it may need to register schemas before the checkpoint is complete.
>
> 2) Does ADD COLUMN support add a NOT-NULL column?
>
> 3) What's the matrix of type evolution? Do you support modifying a column
> to any type?
>
> Best,
> Jark
>
>
>
> On Mon, 9 May 2022 at 16:44, Caizhi Weng <ts...@gmail.com> wrote:
>
> > Hi all!
> >
> > +1 for this FLIP. By adding schema information into data files we can not
> > only support schema evolution, which is a very useful feature for data
> > storages, but also make it easier for table store to integrate with other
> > systems.
> >
> > For example timestamp type in Hive does not support precision. With this
> > extra schema information however we can directly deduce the precision of
> a
> > schema column.
> >
> > Jingsong Li <ji...@gmail.com> 于2022年4月29日周五 17:54写道:
> >
> > > Hi devs,
> > >
> > > I want to start a discussion about Schema Evolution on the Flink Table
> > > Store. [1]
> > >
> > > In FLINK-21634, We plan to support many schema changes in Flink SQL.
> > > But for the current Table Store, it may result in wrong data, unclear
> > > evolutions.
> > >
> > > In general, the user has these operations for schema:
> > > - Add column: Adding a column to a table.
> > > - Modify column type.
> > > - Drop column: Drop a column.
> > > - Rename column: For example, rename the "name_1" column to "name_2".
> > >
> > > Another schema change is partition keys, the data is changing over
> > > time, for example, a table with day partition, as the business
> > > continues to grow, the new partition of the table by day will become
> > > larger and the business wants to change to hourly partitions.
> > >
> > > A simple approach is to rewrite all the existing data when modifying
> the
> > > schema.
> > > But this expensive way is not acceptable to the user, so we need to
> > > support and define it clearly.
> > > Modifying the schema does not rewrite the existing data, when reading
> > > the original data needs to evolve to the current schema.
> > >
> > > Look forward to your feedback!
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store
> > >
> > > Best,
> > > Jingsong
> > >
> >
>

Re: [DISCUSS] FLIP-226: Introduce Schema Evolution on Table Store

Posted by Jark Wu <im...@gmail.com>.
Thanks for proposing this exciting feature, Jingsong!

I only have a few questions:

1) Does table-store support evolve schemas multiple times during a
checkpoint?
For example, cp1 -> write 1M rows (may flush file store) -> evolve schema1
->
write 1M rows (may flush file store again) -> evolve schema2 -> write 1M
rows -> cp2

That means the schemas of new data files are different in this snapshot.
Besides, it may need to register schemas before the checkpoint is complete.

2) Does ADD COLUMN support add a NOT-NULL column?

3) What's the matrix of type evolution? Do you support modifying a column
to any type?

Best,
Jark



On Mon, 9 May 2022 at 16:44, Caizhi Weng <ts...@gmail.com> wrote:

> Hi all!
>
> +1 for this FLIP. By adding schema information into data files we can not
> only support schema evolution, which is a very useful feature for data
> storages, but also make it easier for table store to integrate with other
> systems.
>
> For example timestamp type in Hive does not support precision. With this
> extra schema information however we can directly deduce the precision of a
> schema column.
>
> Jingsong Li <ji...@gmail.com> 于2022年4月29日周五 17:54写道:
>
> > Hi devs,
> >
> > I want to start a discussion about Schema Evolution on the Flink Table
> > Store. [1]
> >
> > In FLINK-21634, We plan to support many schema changes in Flink SQL.
> > But for the current Table Store, it may result in wrong data, unclear
> > evolutions.
> >
> > In general, the user has these operations for schema:
> > - Add column: Adding a column to a table.
> > - Modify column type.
> > - Drop column: Drop a column.
> > - Rename column: For example, rename the "name_1" column to "name_2".
> >
> > Another schema change is partition keys, the data is changing over
> > time, for example, a table with day partition, as the business
> > continues to grow, the new partition of the table by day will become
> > larger and the business wants to change to hourly partitions.
> >
> > A simple approach is to rewrite all the existing data when modifying the
> > schema.
> > But this expensive way is not acceptable to the user, so we need to
> > support and define it clearly.
> > Modifying the schema does not rewrite the existing data, when reading
> > the original data needs to evolve to the current schema.
> >
> > Look forward to your feedback!
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store
> >
> > Best,
> > Jingsong
> >
>

Re: [DISCUSS] FLIP-226: Introduce Schema Evolution on Table Store

Posted by Caizhi Weng <ts...@gmail.com>.
Hi all!

+1 for this FLIP. By adding schema information into data files we can not
only support schema evolution, which is a very useful feature for data
storages, but also make it easier for table store to integrate with other
systems.

For example timestamp type in Hive does not support precision. With this
extra schema information however we can directly deduce the precision of a
schema column.

Jingsong Li <ji...@gmail.com> 于2022年4月29日周五 17:54写道:

> Hi devs,
>
> I want to start a discussion about Schema Evolution on the Flink Table
> Store. [1]
>
> In FLINK-21634, We plan to support many schema changes in Flink SQL.
> But for the current Table Store, it may result in wrong data, unclear
> evolutions.
>
> In general, the user has these operations for schema:
> - Add column: Adding a column to a table.
> - Modify column type.
> - Drop column: Drop a column.
> - Rename column: For example, rename the "name_1" column to "name_2".
>
> Another schema change is partition keys, the data is changing over
> time, for example, a table with day partition, as the business
> continues to grow, the new partition of the table by day will become
> larger and the business wants to change to hourly partitions.
>
> A simple approach is to rewrite all the existing data when modifying the
> schema.
> But this expensive way is not acceptable to the user, so we need to
> support and define it clearly.
> Modifying the schema does not rewrite the existing data, when reading
> the original data needs to evolve to the current schema.
>
> Look forward to your feedback!
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store
>
> Best,
> Jingsong
>