You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ashish Khatkar via user <us...@flink.apache.org> on 2023/03/17 12:48:38 UTC

Way to add columns with defaults to the existing table and recover from the savepoint

Hi all,

I need help in understanding if we can add columns with defaults, let's say
NULL to the existing table and recover the job from the savepoint.

We are using flink-1.16.0 table API and RocksDB as backend to provide a
service to our users to run sql queries. The tables are created using the
avro schema and when the schema is changed in a compatible manner i.e
adding a field with default, we are unable to recover the job from the
savepoint. This is the error we get after the schema is upgraded.

Caused by: org.apache.flink.util.StateMigrationException: The new
state serializer
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a)
must not be incompatible with the old state serializer
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984).

We tried to debug the issue and this error originates from

org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot
-> resolveSchemaCompatibility line 343:345

which checks the length of the type array and also the logicalType for each
element or you can say columns.

Is there a way to restore and evolve a table using table-api when the avro
schema evolves in a compatible manner? If not, is there any plan to provide
upgrades and evolutions with table apis?

Cheers,
Ashish Khatkar

Re: [External] Re: Way to add columns with defaults to the existing table and recover from the savepoint

Posted by Martijn Visser <ma...@apache.org>.
You could consider trying out the experimental version upgrade that was
introduced as part of FLIP-190: https://cwiki.apache.org/confluence/x/KZBnCw

On Tue, Mar 21, 2023 at 12:11 PM Ashish Khatkar via user <
user@flink.apache.org> wrote:

> Hi Shammon,
>
> Schema evolution works with avro type state, and Flink Table API uses
> RowData and has a serializer (RowDataSerializer) for it which doesn't
> allow change in column structure. Regarding state processor api, we are not
> creating any state in our service, we simply use Flink sql as a blackbox
> and let it handle the state. We simply create sql tables out of avro schema
> in StreamTableEnvironment and run queries on those sql tables by creating
> StatementSet and calling the execute() on it.
>
> I found flink doc on upgrade and evolution [1] and according to the doc it
> is not possible to achieve this.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>
> Cheers
> Ashish
>
> On Tue, Mar 21, 2023 at 1:51 AM Shammon FY <zj...@gmail.com> wrote:
>
>> Hi Ashish
>>
>> State compatibility is a complex issue, and you can review the state
>> evolution [1] and state processor [2] docs to see if there's a solution for
>> your problem.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>>
>> Best,
>> Shammon FY
>>
>>
>> On Fri, Mar 17, 2023 at 8:48 PM Ashish Khatkar via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi all,
>>>
>>> I need help in understanding if we can add columns with defaults, let's
>>> say NULL to the existing table and recover the job from the savepoint.
>>>
>>> We are using flink-1.16.0 table API and RocksDB as backend to provide a
>>> service to our users to run sql queries. The tables are created using the
>>> avro schema and when the schema is changed in a compatible manner i.e
>>> adding a field with default, we are unable to recover the job from the
>>> savepoint. This is the error we get after the schema is upgraded.
>>>
>>> Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a) must not be incompatible with the old state serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984).
>>>
>>> We tried to debug the issue and this error originates from
>>>
>>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot -> resolveSchemaCompatibility line 343:345
>>>
>>> which checks the length of the type array and also the logicalType for
>>> each element or you can say columns.
>>>
>>> Is there a way to restore and evolve a table using table-api when the
>>> avro schema evolves in a compatible manner? If not, is there any plan to
>>> provide upgrades and evolutions with table apis?
>>>
>>> Cheers,
>>> Ashish Khatkar
>>>
>>

Re: [External] Re: Way to add columns with defaults to the existing table and recover from the savepoint

Posted by Ashish Khatkar via user <us...@flink.apache.org>.
Hi Shammon,

Schema evolution works with avro type state, and Flink Table API uses
RowData and has a serializer (RowDataSerializer) for it which doesn't allow
change in column structure. Regarding state processor api, we are not
creating any state in our service, we simply use Flink sql as a blackbox
and let it handle the state. We simply create sql tables out of avro schema
in StreamTableEnvironment and run queries on those sql tables by creating
StatementSet and calling the execute() on it.

I found flink doc on upgrade and evolution [1] and according to the doc it
is not possible to achieve this.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution

Cheers
Ashish

On Tue, Mar 21, 2023 at 1:51 AM Shammon FY <zj...@gmail.com> wrote:

> Hi Ashish
>
> State compatibility is a complex issue, and you can review the state
> evolution [1] and state processor [2] docs to see if there's a solution for
> your problem.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>
> Best,
> Shammon FY
>
>
> On Fri, Mar 17, 2023 at 8:48 PM Ashish Khatkar via user <
> user@flink.apache.org> wrote:
>
>> Hi all,
>>
>> I need help in understanding if we can add columns with defaults, let's
>> say NULL to the existing table and recover the job from the savepoint.
>>
>> We are using flink-1.16.0 table API and RocksDB as backend to provide a
>> service to our users to run sql queries. The tables are created using the
>> avro schema and when the schema is changed in a compatible manner i.e
>> adding a field with default, we are unable to recover the job from the
>> savepoint. This is the error we get after the schema is upgraded.
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a) must not be incompatible with the old state serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984).
>>
>> We tried to debug the issue and this error originates from
>>
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot -> resolveSchemaCompatibility line 343:345
>>
>> which checks the length of the type array and also the logicalType for
>> each element or you can say columns.
>>
>> Is there a way to restore and evolve a table using table-api when the
>> avro schema evolves in a compatible manner? If not, is there any plan to
>> provide upgrades and evolutions with table apis?
>>
>> Cheers,
>> Ashish Khatkar
>>
>

Re: Way to add columns with defaults to the existing table and recover from the savepoint

Posted by Shammon FY <zj...@gmail.com>.
Hi Ashish

State compatibility is a complex issue, and you can review the state
evolution [1] and state processor [2] docs to see if there's a solution for
your problem.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/

Best,
Shammon FY


On Fri, Mar 17, 2023 at 8:48 PM Ashish Khatkar via user <
user@flink.apache.org> wrote:

> Hi all,
>
> I need help in understanding if we can add columns with defaults, let's
> say NULL to the existing table and recover the job from the savepoint.
>
> We are using flink-1.16.0 table API and RocksDB as backend to provide a
> service to our users to run sql queries. The tables are created using the
> avro schema and when the schema is changed in a compatible manner i.e
> adding a field with default, we are unable to recover the job from the
> savepoint. This is the error we get after the schema is upgraded.
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a) must not be incompatible with the old state serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984).
>
> We tried to debug the issue and this error originates from
>
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot -> resolveSchemaCompatibility line 343:345
>
> which checks the length of the type array and also the logicalType for
> each element or you can say columns.
>
> Is there a way to restore and evolve a table using table-api when the avro
> schema evolves in a compatible manner? If not, is there any plan to provide
> upgrades and evolutions with table apis?
>
> Cheers,
> Ashish Khatkar
>