You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shamit jain <ja...@gmail.com> on 2022/01/01 18:04:43 UTC

Re: Error while deploying from snapshot after adding new column in existing table

Thanks Martijn! I will check Datastream APIs, if it fits in our use case.

Regards,
Shamit Jain

On Thu, Dec 30, 2021 at 3:44 AM Martijn Visser <ma...@ververica.com>
wrote:

> Hi Shamit,
>
> Yes, there are more possibilities when using the DataStream API like with
> the link you've included. You could also use the State Processor API [1] to
> read, write & modify your savepoint.
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
>
> On Wed, 29 Dec 2021 at 22:57, shamit jain <ja...@gmail.com> wrote:
>
>> Thanks Martijn!
>>
>> One more question, Can we achieve(schema evolution) using DataStream APIs?
>>
>> In flink documentation, I found [2] "The process of migrating state to
>> adapt to changed schemas happens automatically, and independently for each
>> state. This process is performed internally by Flink by first checking if
>> the new serializer for the state has different serialization schema than
>> the previous serializer; if so, the previous serializer is used to read the
>> state to objects, and written back to bytes again with the new serializer."
>>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema
>>
>> Regards,
>> Shamit Jain
>>
>>
>>
>> On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <ma...@ververica.com>
>> wrote:
>>
>>> Hi Shamit,
>>>
>>> Adding columns means that you're trying to perform schema evolution,
>>> which isn't yet supported by Flink per the documentation [1] "Savepoints
>>> are only supported if both the query and the Flink version remain constant"
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>>>
>>> On Wed, 29 Dec 2021 at 04:10, shamit jain <ja...@gmail.com> wrote:
>>>
>>>> Hello Experts,
>>>>
>>>>
>>>> I need help to understand whether we can deploy a job from a snapshot
>>>> after changing the code by adding one new column in an existing table.
>>>>
>>>> We are using flink-1.13.2 table API and RocksDB as backend. We have
>>>> changed the code and added one new column in the existing table and later,
>>>> tried to deploy from the snapshot. While deploying, I'm getting the below
>>>> error:-
>>>>
>>>>
>>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>>> serializer
>>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
>>>> must not be incompatible with the old state serializer
>>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>>>>    at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>>>>    at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>>>    at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>>>    at
>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>>    at
>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>>>>    at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>>>>    at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>>>>    at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>>>    at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>>    ...
>>>>
>>>>
>>>> After troubleshooting, I found we are getting this error while
>>>> comparing the fields of previous and latest table definition.
>>>>
>>>> This comparision is happening from flink-table-runtime library  class:
>>>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
>>>> resolveSchemaCompatibility()
>>>>
>>>>    if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>>>>                 return TypeSerializerSchemaCompatibility.incompatible();
>>>>             }
>>>>
>>>>
>>>> Can you please help me to understand if we can add a new column in an
>>>> existing table and deploy from the snapshot?
>>>>
>>>> Regards,
>>>> Shamit Jain
>>>>
>>>