You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steven Nelson <sn...@sourceallies.com> on 2019/10/08 15:17:01 UTC

Problem with savepoint deserialization

Hello! We are working with a Scala based pipeline.

We changed

case class Record(orgId: Int)

To

case class Record(orgId: Int, operationId:Option[String] = None)

And now our savepoints fail with this exception:
org.apache.flink.util.StateMigrationException: The new state serializer
cannot be incompatible.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
.createInternalState(RocksDBKeyedStateBackend.java:643)
    at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(
KeyedStateFactory.java:47)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory
.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend
.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.getOrCreateKeyedState(AbstractStreamOperator.java:577)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.open(WindowOperator.java:240)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(
StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

I was under the impression we could add items to case classes and still be
able to use existing state to start the job.

-Steve

Re: Problem with savepoint deserialization

Posted by Congxian Qiu <qc...@gmail.com>.
Hi  Steven

From the exception, seems the serializer used before and after the change
is incompatible, I'm not very familiar with Scala case class, maybe you can
debug it locally, which serializer used before and after the change for the
case class.

Best,
Congxian


Steven Nelson <sn...@sourceallies.com> 于2019年10月9日周三 上午4:09写道:

> No problem :)
>
> I wasn’t able to find documentation on what can and cannot be upgraded for
> case classes. I had assumed the same rules that applied to POJO scheme
> upgrading applied to case classes. Has someone put together rules for case
> classes? I also should have mentioned we are running 1.9 Flink.
>
>
>
> Sent from my iPhone
>
> On Oct 8, 2019, at 3:03 PM, Aleksandar Mastilovic <
> amastilovic@sightmachine.com> wrote:
>
> I’m pretty sure java.util.Optional is not serializable:
> https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc
>
> However on a second look I can now see you’re using Scala’s Option, which
> IS serializable :) My apologies for that.
>
> So your problem was that you had previous version in your save point which
> of course cannot be deserialized into the new version without custom code
> that would handle a missing Option.
>
> On Oct 8, 2019, at 11:38 AM, Steven Nelson <sn...@sourceallies.com>
> wrote:
>
> Are you sure? I just restarted the job with new new version, but not from
> a savepoint and took a new savepoint and it seemed to work from there. It
> just seemed like it couldn’t upgrade the schema during restore.
>
> Sent from my iPhone
>
> On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic <
> amastilovic@sightmachine.com> wrote:
>
> The Option class is not serializable, if you put something serializable
> into that case class you wouldn’t have problems.
>
> On Oct 8, 2019, at 8:17 AM, Steven Nelson <sn...@sourceallies.com>
> wrote:
>
> Hello! We are working with a Scala based pipeline.
>
> We changed
>
> case class Record(orgId: Int)
>
> To
>
> case class Record(orgId: Int, operationId:Option[String] = None)
>
> And now our savepoints fail with this exception:
> org.apache.flink.util.StateMigrationException: The new state serializer
> cannot be incompatible.
>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
> .updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
> .tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
> .createInternalState(RocksDBKeyedStateBackend.java:643)
>     at org.apache.flink.runtime.state.KeyedStateFactory
> .createInternalState(KeyedStateFactory.java:47)
>     at org.apache.flink.runtime.state.ttl.TtlStateFactory
> .createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>     at org.apache.flink.runtime.state.AbstractKeyedStateBackend
> .getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .getOrCreateKeyedState(AbstractStreamOperator.java:577)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.open(WindowOperator.java:240)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .openAllOperators(StreamTask.java:529)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:393)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
>
> I was under the impression we could add items to case classes and still be
> able to use existing state to start the job.
>
> -Steve
>
>
>
>

Re: Problem with savepoint deserialization

Posted by Steven Nelson <sn...@sourceallies.com>.
No problem :)

I wasn’t able to find documentation on what can and cannot be upgraded for case classes. I had assumed the same rules that applied to POJO scheme upgrading applied to case classes. Has someone put together rules for case classes? I also should have mentioned we are running 1.9 Flink.



Sent from my iPhone

> On Oct 8, 2019, at 3:03 PM, Aleksandar Mastilovic <am...@sightmachine.com> wrote:
> 
> I’m pretty sure java.util.Optional is not serializable: https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc
> 
> However on a second look I can now see you’re using Scala’s Option, which IS serializable :) My apologies for that.
> 
> So your problem was that you had previous version in your save point which of course cannot be deserialized into the new version without custom code that would handle a missing Option.
> 
>> On Oct 8, 2019, at 11:38 AM, Steven Nelson <sn...@sourceallies.com> wrote:
>> 
>> Are you sure? I just restarted the job with new new version, but not from a savepoint and took a new savepoint and it seemed to work from there. It just seemed like it couldn’t upgrade the schema during restore.
>> 
>> Sent from my iPhone
>> 
>>> On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic <am...@sightmachine.com> wrote:
>>> 
>>> The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems.
>>> 
>>>> On Oct 8, 2019, at 8:17 AM, Steven Nelson <sn...@sourceallies.com> wrote:
>>>> 
>>>> Hello! We are working with a Scala based pipeline.
>>>> 
>>>> We changed 
>>>> 
>>>> case class Record(orgId: Int)
>>>> 
>>>> To
>>>> 
>>>> case class Record(orgId: Int, operationId:Option[String] = None)
>>>> 
>>>> And now our savepoints fail with this exception:
>>>> org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
>>>>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
>>>>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>>>>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
>>>>     at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>>     at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>>>>     at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>>>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
>>>>     at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> 
>>>> I was under the impression we could add items to case classes and still be able to use existing state to start the job.
>>>> 
>>>> -Steve
>>>> 
>>> 
> 

Re: Problem with savepoint deserialization

Posted by Aleksandar Mastilovic <am...@sightmachine.com>.
I’m pretty sure java.util.Optional is not serializable: https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc <https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc>

However on a second look I can now see you’re using Scala’s Option, which IS serializable :) My apologies for that.

So your problem was that you had previous version in your save point which of course cannot be deserialized into the new version without custom code that would handle a missing Option.

> On Oct 8, 2019, at 11:38 AM, Steven Nelson <sn...@sourceallies.com> wrote:
> 
> Are you sure? I just restarted the job with new new version, but not from a savepoint and took a new savepoint and it seemed to work from there. It just seemed like it couldn’t upgrade the schema during restore.
> 
> Sent from my iPhone
> 
> On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic <amastilovic@sightmachine.com <ma...@sightmachine.com>> wrote:
> 
>> The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems.
>> 
>>> On Oct 8, 2019, at 8:17 AM, Steven Nelson <snelson@sourceallies.com <ma...@sourceallies.com>> wrote:
>>> 
>>> Hello! We are working with a Scala based pipeline.
>>> 
>>> We changed 
>>> 
>>> case class Record(orgId: Int)
>>> 
>>> To
>>> 
>>> case class Record(orgId: Int, operationId:Option[String] = None)
>>> 
>>> And now our savepoints fail with this exception:
>>> org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
>>>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
>>>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>>>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
>>>     at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>     at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>>>     at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> 
>>> I was under the impression we could add items to case classes and still be able to use existing state to start the job.
>>> 
>>> -Steve
>>> 
>> 


Re: Problem with savepoint deserialization

Posted by Steven Nelson <sn...@sourceallies.com>.
Are you sure? I just restarted the job with new new version, but not from a savepoint and took a new savepoint and it seemed to work from there. It just seemed like it couldn’t upgrade the schema during restore.

Sent from my iPhone

> On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic <am...@sightmachine.com> wrote:
> 
> The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems.
> 
>> On Oct 8, 2019, at 8:17 AM, Steven Nelson <sn...@sourceallies.com> wrote:
>> 
>> Hello! We are working with a Scala based pipeline.
>> 
>> We changed 
>> 
>> case class Record(orgId: Int)
>> 
>> To
>> 
>> case class Record(orgId: Int, operationId:Option[String] = None)
>> 
>> And now our savepoints fail with this exception:
>> org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
>>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
>>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
>>     at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>     at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>>     at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
>>     at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>     at java.lang.Thread.run(Thread.java:748)
>> 
>> I was under the impression we could add items to case classes and still be able to use existing state to start the job.
>> 
>> -Steve
>> 
> 

Re: Problem with savepoint deserialization

Posted by Aleksandar Mastilovic <am...@sightmachine.com>.
The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems.

> On Oct 8, 2019, at 8:17 AM, Steven Nelson <sn...@sourceallies.com> wrote:
> 
> Hello! We are working with a Scala based pipeline.
> 
> We changed 
> 
> case class Record(orgId: Int)
> 
> To
> 
> case class Record(orgId: Int, operationId:Option[String] = None)
> 
> And now our savepoints fail with this exception:
> org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
>     at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>     at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>     at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
>     at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
> 
> I was under the impression we could add items to case classes and still be able to use existing state to start the job.
> 
> -Steve
>