You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shamit jain <ja...@gmail.com> on 2021/12/29 03:10:03 UTC
Error while deploying from snapshot after adding new column in existing table
Hello Experts,
I need help to understand whether we can deploy a job from a snapshot after
changing the code by adding one new column in an existing table.
We are using flink-1.13.2 table API and RocksDB as backend. We have changed
the code and added one new column in the existing table and later, tried to
deploy from the snapshot. While deploying, I'm getting the below error:-
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336) must
not be incompatible with the old state serializer
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
...
After troubleshooting, I found we are getting this error while comparing
the fields of previous and latest table definition.
This comparision is happening from flink-table-runtime library class:
org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
resolveSchemaCompatibility()
if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
Can you please help me to understand if we can add a new column in an
existing table and deploy from the snapshot?
Regards,
Shamit Jain
Re: Error while deploying from snapshot after adding new column in existing table
Posted by shamit jain <ja...@gmail.com>.
Thanks Martijn! I will check Datastream APIs, if it fits in our use case.
Regards,
Shamit Jain
On Thu, Dec 30, 2021 at 3:44 AM Martijn Visser <ma...@ververica.com>
wrote:
> Hi Shamit,
>
> Yes, there are more possibilities when using the DataStream API like with
> the link you've included. You could also use the State Processor API [1] to
> read, write & modify your savepoint.
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
>
> On Wed, 29 Dec 2021 at 22:57, shamit jain <ja...@gmail.com> wrote:
>
>> Thanks Martijn!
>>
>> One more question, Can we achieve(schema evolution) using DataStream APIs?
>>
>> In flink documentation, I found [2] "The process of migrating state to
>> adapt to changed schemas happens automatically, and independently for each
>> state. This process is performed internally by Flink by first checking if
>> the new serializer for the state has different serialization schema than
>> the previous serializer; if so, the previous serializer is used to read the
>> state to objects, and written back to bytes again with the new serializer."
>>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema
>>
>> Regards,
>> Shamit Jain
>>
>>
>>
>> On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <ma...@ververica.com>
>> wrote:
>>
>>> Hi Shamit,
>>>
>>> Adding columns means that you're trying to perform schema evolution,
>>> which isn't yet supported by Flink per the documentation [1] "Savepoints
>>> are only supported if both the query and the Flink version remain constant"
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>>>
>>> On Wed, 29 Dec 2021 at 04:10, shamit jain <ja...@gmail.com> wrote:
>>>
>>>> Hello Experts,
>>>>
>>>>
>>>> I need help to understand whether we can deploy a job from a snapshot
>>>> after changing the code by adding one new column in an existing table.
>>>>
>>>> We are using flink-1.13.2 table API and RocksDB as backend. We have
>>>> changed the code and added one new column in the existing table and later,
>>>> tried to deploy from the snapshot. While deploying, I'm getting the below
>>>> error:-
>>>>
>>>>
>>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>>> serializer
>>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
>>>> must not be incompatible with the old state serializer
>>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>>> at
>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>> at
>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>>>> at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>>>> at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>> ...
>>>>
>>>>
>>>> After troubleshooting, I found we are getting this error while
>>>> comparing the fields of previous and latest table definition.
>>>>
>>>> This comparision is happening from flink-table-runtime library class:
>>>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
>>>> resolveSchemaCompatibility()
>>>>
>>>> if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>>>> return TypeSerializerSchemaCompatibility.incompatible();
>>>> }
>>>>
>>>>
>>>> Can you please help me to understand if we can add a new column in an
>>>> existing table and deploy from the snapshot?
>>>>
>>>> Regards,
>>>> Shamit Jain
>>>>
>>>
Re: Error while deploying from snapshot after adding new column in existing table
Posted by Martijn Visser <ma...@ververica.com>.
Hi Shamit,
Yes, there are more possibilities when using the DataStream API like with
the link you've included. You could also use the State Processor API [1] to
read, write & modify your savepoint.
Best regards,
Martijn
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
On Wed, 29 Dec 2021 at 22:57, shamit jain <ja...@gmail.com> wrote:
> Thanks Martijn!
>
> One more question, Can we achieve(schema evolution) using DataStream APIs?
>
> In flink documentation, I found [2] "The process of migrating state to
> adapt to changed schemas happens automatically, and independently for each
> state. This process is performed internally by Flink by first checking if
> the new serializer for the state has different serialization schema than
> the previous serializer; if so, the previous serializer is used to read the
> state to objects, and written back to bytes again with the new serializer."
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema
>
> Regards,
> Shamit Jain
>
>
>
> On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <ma...@ververica.com>
> wrote:
>
>> Hi Shamit,
>>
>> Adding columns means that you're trying to perform schema evolution,
>> which isn't yet supported by Flink per the documentation [1] "Savepoints
>> are only supported if both the query and the Flink version remain constant"
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>>
>> On Wed, 29 Dec 2021 at 04:10, shamit jain <ja...@gmail.com> wrote:
>>
>>> Hello Experts,
>>>
>>>
>>> I need help to understand whether we can deploy a job from a snapshot
>>> after changing the code by adding one new column in an existing table.
>>>
>>> We are using flink-1.13.2 table API and RocksDB as backend. We have
>>> changed the code and added one new column in the existing table and later,
>>> tried to deploy from the snapshot. While deploying, I'm getting the below
>>> error:-
>>>
>>>
>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>> serializer
>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
>>> must not be incompatible with the old state serializer
>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>> at
>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>> at
>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>>> at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>>> at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>>> at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>> at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>> ...
>>>
>>>
>>> After troubleshooting, I found we are getting this error while comparing
>>> the fields of previous and latest table definition.
>>>
>>> This comparision is happening from flink-table-runtime library class:
>>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
>>> resolveSchemaCompatibility()
>>>
>>> if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>>> return TypeSerializerSchemaCompatibility.incompatible();
>>> }
>>>
>>>
>>> Can you please help me to understand if we can add a new column in an
>>> existing table and deploy from the snapshot?
>>>
>>> Regards,
>>> Shamit Jain
>>>
>>
Re: Error while deploying from snapshot after adding new column in existing table
Posted by shamit jain <ja...@gmail.com>.
Thanks Martijn!
One more question, Can we achieve(schema evolution) using DataStream APIs?
In flink documentation, I found [2] "The process of migrating state to
adapt to changed schemas happens automatically, and independently for each
state. This process is performed internally by Flink by first checking if
the new serializer for the state has different serialization schema than
the previous serializer; if so, the previous serializer is used to read the
state to objects, and written back to bytes again with the new serializer."
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema
Regards,
Shamit Jain
On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <ma...@ververica.com>
wrote:
> Hi Shamit,
>
> Adding columns means that you're trying to perform schema evolution, which
> isn't yet supported by Flink per the documentation [1] "Savepoints are only
> supported if both the query and the Flink version remain constant"
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>
> On Wed, 29 Dec 2021 at 04:10, shamit jain <ja...@gmail.com> wrote:
>
>> Hello Experts,
>>
>>
>> I need help to understand whether we can deploy a job from a snapshot
>> after changing the code by adding one new column in an existing table.
>>
>> We are using flink-1.13.2 table API and RocksDB as backend. We have
>> changed the code and added one new column in the existing table and later,
>> tried to deploy from the snapshot. While deploying, I'm getting the below
>> error:-
>>
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>> serializer
>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
>> must not be incompatible with the old state serializer
>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>> at
>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>> at
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>> at
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>> at
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>> at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>> at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>> ...
>>
>>
>> After troubleshooting, I found we are getting this error while comparing
>> the fields of previous and latest table definition.
>>
>> This comparision is happening from flink-table-runtime library class:
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
>> resolveSchemaCompatibility()
>>
>> if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>> return TypeSerializerSchemaCompatibility.incompatible();
>> }
>>
>>
>> Can you please help me to understand if we can add a new column in an
>> existing table and deploy from the snapshot?
>>
>> Regards,
>> Shamit Jain
>>
>
Re: Error while deploying from snapshot after adding new column in existing table
Posted by Martijn Visser <ma...@ververica.com>.
Hi Shamit,
Adding columns means that you're trying to perform schema evolution, which
isn't yet supported by Flink per the documentation [1] "Savepoints are only
supported if both the query and the Flink version remain constant"
Best regards,
Martijn
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
On Wed, 29 Dec 2021 at 04:10, shamit jain <ja...@gmail.com> wrote:
> Hello Experts,
>
>
> I need help to understand whether we can deploy a job from a snapshot
> after changing the code by adding one new column in an existing table.
>
> We are using flink-1.13.2 table API and RocksDB as backend. We have
> changed the code and added one new column in the existing table and later,
> tried to deploy from the snapshot. While deploying, I'm getting the below
> error:-
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer
> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
> must not be incompatible with the old state serializer
> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
> at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ...
>
>
> After troubleshooting, I found we are getting this error while comparing
> the fields of previous and latest table definition.
>
> This comparision is happening from flink-table-runtime library class:
> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
> resolveSchemaCompatibility()
>
> if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
> return TypeSerializerSchemaCompatibility.incompatible();
> }
>
>
> Can you please help me to understand if we can add a new column in an
> existing table and deploy from the snapshot?
>
> Regards,
> Shamit Jain
>