You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shamit jain <ja...@gmail.com> on 2021/12/29 03:10:03 UTC

Error while deploying from snapshot after adding new column in existing table

Hello Experts,


I need help to understand whether we can deploy a job from a snapshot after
changing the code by adding one new column in an existing table.

We are using flink-1.13.2 table API and RocksDB as backend. We have changed
the code and added one new column in the existing table and later, tried to
deploy from the snapshot. While deploying, I'm getting the below error:-


Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336) must
not be incompatible with the old state serializer
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
   at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
   at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
   at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
   at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
   at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
   at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
   at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
   at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
   at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
   ...


After troubleshooting, I found we are getting this error while comparing
the fields of previous and latest table definition.

This comparision is happening from flink-table-runtime library  class:
org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
resolveSchemaCompatibility()

   if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
                return TypeSerializerSchemaCompatibility.incompatible();
            }


Can you please help me to understand if we can add a new column in an
existing table and deploy from the snapshot?

Regards,
Shamit Jain

Re: Error while deploying from snapshot after adding new column in existing table

Posted by shamit jain <ja...@gmail.com>.
Thanks Martijn! I will check Datastream APIs, if it fits in our use case.

Regards,
Shamit Jain

On Thu, Dec 30, 2021 at 3:44 AM Martijn Visser <ma...@ververica.com>
wrote:

> Hi Shamit,
>
> Yes, there are more possibilities when using the DataStream API like with
> the link you've included. You could also use the State Processor API [1] to
> read, write & modify your savepoint.
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
>
> On Wed, 29 Dec 2021 at 22:57, shamit jain <ja...@gmail.com> wrote:
>
>> Thanks Martijn!
>>
>> One more question, Can we achieve(schema evolution) using DataStream APIs?
>>
>> In flink documentation, I found [2] "The process of migrating state to
>> adapt to changed schemas happens automatically, and independently for each
>> state. This process is performed internally by Flink by first checking if
>> the new serializer for the state has different serialization schema than
>> the previous serializer; if so, the previous serializer is used to read the
>> state to objects, and written back to bytes again with the new serializer."
>>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema
>>
>> Regards,
>> Shamit Jain
>>
>>
>>
>> On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <ma...@ververica.com>
>> wrote:
>>
>>> Hi Shamit,
>>>
>>> Adding columns means that you're trying to perform schema evolution,
>>> which isn't yet supported by Flink per the documentation [1] "Savepoints
>>> are only supported if both the query and the Flink version remain constant"
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>>>
>>> On Wed, 29 Dec 2021 at 04:10, shamit jain <ja...@gmail.com> wrote:
>>>
>>>> Hello Experts,
>>>>
>>>>
>>>> I need help to understand whether we can deploy a job from a snapshot
>>>> after changing the code by adding one new column in an existing table.
>>>>
>>>> We are using flink-1.13.2 table API and RocksDB as backend. We have
>>>> changed the code and added one new column in the existing table and later,
>>>> tried to deploy from the snapshot. While deploying, I'm getting the below
>>>> error:-
>>>>
>>>>
>>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>>> serializer
>>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
>>>> must not be incompatible with the old state serializer
>>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>>>>    at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>>>>    at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>>>    at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>>>    at
>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>>    at
>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>>>>    at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>>>>    at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>>>>    at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>>>    at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>>    ...
>>>>
>>>>
>>>> After troubleshooting, I found we are getting this error while
>>>> comparing the fields of previous and latest table definition.
>>>>
>>>> This comparision is happening from flink-table-runtime library  class:
>>>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
>>>> resolveSchemaCompatibility()
>>>>
>>>>    if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>>>>                 return TypeSerializerSchemaCompatibility.incompatible();
>>>>             }
>>>>
>>>>
>>>> Can you please help me to understand if we can add a new column in an
>>>> existing table and deploy from the snapshot?
>>>>
>>>> Regards,
>>>> Shamit Jain
>>>>
>>>

Re: Error while deploying from snapshot after adding new column in existing table

Posted by Martijn Visser <ma...@ververica.com>.
Hi Shamit,

Yes, there are more possibilities when using the DataStream API like with
the link you've included. You could also use the State Processor API [1] to
read, write & modify your savepoint.

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/

On Wed, 29 Dec 2021 at 22:57, shamit jain <ja...@gmail.com> wrote:

> Thanks Martijn!
>
> One more question, Can we achieve(schema evolution) using DataStream APIs?
>
> In flink documentation, I found [2] "The process of migrating state to
> adapt to changed schemas happens automatically, and independently for each
> state. This process is performed internally by Flink by first checking if
> the new serializer for the state has different serialization schema than
> the previous serializer; if so, the previous serializer is used to read the
> state to objects, and written back to bytes again with the new serializer."
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema
>
> Regards,
> Shamit Jain
>
>
>
> On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <ma...@ververica.com>
> wrote:
>
>> Hi Shamit,
>>
>> Adding columns means that you're trying to perform schema evolution,
>> which isn't yet supported by Flink per the documentation [1] "Savepoints
>> are only supported if both the query and the Flink version remain constant"
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>>
>> On Wed, 29 Dec 2021 at 04:10, shamit jain <ja...@gmail.com> wrote:
>>
>>> Hello Experts,
>>>
>>>
>>> I need help to understand whether we can deploy a job from a snapshot
>>> after changing the code by adding one new column in an existing table.
>>>
>>> We are using flink-1.13.2 table API and RocksDB as backend. We have
>>> changed the code and added one new column in the existing table and later,
>>> tried to deploy from the snapshot. While deploying, I'm getting the below
>>> error:-
>>>
>>>
>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>> serializer
>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
>>> must not be incompatible with the old state serializer
>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>>>    at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>>>    at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>>    at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>>    at
>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>    at
>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>>>    at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>>>    at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>>>    at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>>    at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>    ...
>>>
>>>
>>> After troubleshooting, I found we are getting this error while comparing
>>> the fields of previous and latest table definition.
>>>
>>> This comparision is happening from flink-table-runtime library  class:
>>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
>>> resolveSchemaCompatibility()
>>>
>>>    if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>>>                 return TypeSerializerSchemaCompatibility.incompatible();
>>>             }
>>>
>>>
>>> Can you please help me to understand if we can add a new column in an
>>> existing table and deploy from the snapshot?
>>>
>>> Regards,
>>> Shamit Jain
>>>
>>

Re: Error while deploying from snapshot after adding new column in existing table

Posted by shamit jain <ja...@gmail.com>.
Thanks Martijn!

One more question, Can we achieve(schema evolution) using DataStream APIs?

In flink documentation, I found [2] "The process of migrating state to
adapt to changed schemas happens automatically, and independently for each
state. This process is performed internally by Flink by first checking if
the new serializer for the state has different serialization schema than
the previous serializer; if so, the previous serializer is used to read the
state to objects, and written back to bytes again with the new serializer."

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema

Regards,
Shamit Jain



On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <ma...@ververica.com>
wrote:

> Hi Shamit,
>
> Adding columns means that you're trying to perform schema evolution, which
> isn't yet supported by Flink per the documentation [1] "Savepoints are only
> supported if both the query and the Flink version remain constant"
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>
> On Wed, 29 Dec 2021 at 04:10, shamit jain <ja...@gmail.com> wrote:
>
>> Hello Experts,
>>
>>
>> I need help to understand whether we can deploy a job from a snapshot
>> after changing the code by adding one new column in an existing table.
>>
>> We are using flink-1.13.2 table API and RocksDB as backend. We have
>> changed the code and added one new column in the existing table and later,
>> tried to deploy from the snapshot. While deploying, I'm getting the below
>> error:-
>>
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>> serializer
>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
>> must not be incompatible with the old state serializer
>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>>    at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>>    at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>    at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>    at
>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>    at
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>>    at
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>>    at
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>>    at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>    at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>    ...
>>
>>
>> After troubleshooting, I found we are getting this error while comparing
>> the fields of previous and latest table definition.
>>
>> This comparision is happening from flink-table-runtime library  class:
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
>> resolveSchemaCompatibility()
>>
>>    if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>>                 return TypeSerializerSchemaCompatibility.incompatible();
>>             }
>>
>>
>> Can you please help me to understand if we can add a new column in an
>> existing table and deploy from the snapshot?
>>
>> Regards,
>> Shamit Jain
>>
>

Re: Error while deploying from snapshot after adding new column in existing table

Posted by Martijn Visser <ma...@ververica.com>.
Hi Shamit,

Adding columns means that you're trying to perform schema evolution, which
isn't yet supported by Flink per the documentation [1] "Savepoints are only
supported if both the query and the Flink version remain constant"

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution

On Wed, 29 Dec 2021 at 04:10, shamit jain <ja...@gmail.com> wrote:

> Hello Experts,
>
>
> I need help to understand whether we can deploy a job from a snapshot
> after changing the code by adding one new column in an existing table.
>
> We are using flink-1.13.2 table API and RocksDB as backend. We have
> changed the code and added one new column in the existing table and later,
> tried to deploy from the snapshot. While deploying, I'm getting the below
> error:-
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer
> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
> must not be incompatible with the old state serializer
> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>    at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>    at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>    at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>    at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>    at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>    at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>    at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>    at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>    at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>    ...
>
>
> After troubleshooting, I found we are getting this error while comparing
> the fields of previous and latest table definition.
>
> This comparision is happening from flink-table-runtime library  class:
> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
> resolveSchemaCompatibility()
>
>    if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>                 return TypeSerializerSchemaCompatibility.incompatible();
>             }
>
>
> Can you please help me to understand if we can add a new column in an
> existing table and deploy from the snapshot?
>
> Regards,
> Shamit Jain
>