You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Josh <jo...@gmail.com> on 2016/09/21 17:44:23 UTC

Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

Hi,

I have a Flink job which uses the RocksDBStateBackend, which has been
running on a Flink 1.0 cluster.

The job is written in Scala, and I previously made some changes to the job
to ensure that state could be restored. For example, whenever I call `map`
or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
MyCustomFlatMapper())` instead of an anonymous function.

I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able to
restore state. I'm seeing exceptions which look like this when trying to
restore from a savepoint:

java.lang.RuntimeException: Could not initialize keyed state backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(AbstractStreamOperator.java:148)
Caused by: java.lang.ClassNotFoundException:
com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:653)

I'm not passing any anonymous functions to `map` or `flatMap` on Flink
DataStreams, so it looks like this exception is caused just from using
Scala functions like `filter`, `map`, `flatMap` on standard Scala
collections, within my class `MyCustomFlatMapper`.

Are there any changes to the way Flink state is restored or to
RocksDBStateBackend, in the last 2-3 months, which could cause this to
happen?

If so, any advice on fixing it?

I'm hoping there's a better solution to this than rewriting the Flink job
in Java.

Thanks,

Josh

Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

Posted by Stephan Ewen <se...@apache.org>.
I think Josh found a "WIP" bug. The code is very much in flux because of
the new feature that allows to change the parallelism with which savepoints
are resumed.

The "user code class loader" is not yet properly used in the operator state
backend when reloading snapshot state. This will be integrated soon.


On Tue, Oct 4, 2016 at 11:36 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Josh,
>
> the internal state representation of Kafka sources has been changed
> recently so that it is now possible to rescale the Kafka sources. That is
> the reason why the old savepoint which contains the Kafka state in the old
> representation is not able to be read by the updated Kafka sources.
>
> The backwards compatibility feature for the Kafka source is still pending
> to be implemented. In the final 1.2 release this should be added. Sorry for
> the inconveniences.
>
> Cheers,
> Till
>
> On Mon, Oct 3, 2016 at 1:37 PM, Josh <jo...@gmail.com> wrote:
>
>> Hi Stefan,
>>
>> Sorry for the late reply - I was away last week.
>> I've just got round to retrying my above scenario (run my job, take a
>> savepoint, restore my job) using the latest Flink 1.2-SNAPSHOT  -- and am
>> now seeing a different exception when restoring the state:
>>
>> 10/03/2016 11:29:02 Job execution switched to status FAILING.
>> java.lang.RuntimeException: Could not deserialize NFA.
>> at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.d
>> eserialize(JavaSerializer.java:86)
>> at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.d
>> eserialize(JavaSerializer.java:31)
>> at org.apache.flink.runtime.state.DefaultOperatorStateBackend.g
>> etPartitionableState(DefaultOperatorStateBackend.java:107)
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum
>> erBase.initializeState(FlinkKafkaConsumerBase.java:323)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:105)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:396)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>> treamTask.java:269)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
>>
>> Any ideas what's going on here? Is the Kafka consumer state management
>> broken right now in Flink master?
>>
>> Thanks,
>> Josh
>>
>>
>> On Thu, Sep 22, 2016 at 9:28 AM, Stefan Richter <
>> s.richter@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> to me, this looks like you are running into the problem described under
>>> [FLINK-4603] : KeyedStateBackend cannot restore user code classes. I have
>>> opened a pull request (PR 2533) this morning that should fix this behavior
>>> as soon as it is merged into master.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 21.09.2016 um 23:49 schrieb Josh <jo...@gmail.com>:
>>>
>>> Hi Stephan,
>>>
>>> Thanks for the reply. I should have been a bit clearer but actually I
>>> was not trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2
>>> from scratch (starting with no state), then took a savepoint and tried to
>>> restart it from the savepoint - and that's when I get this exception. If I
>>> do this with the same job using an older version of Flink (1.1-SNAPSHOT
>>> taken in June), the savepoint and restore works fine.
>>>
>>> I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use
>>> (improvements to Kinesis connector + the bucketing sink). Anyway for now I
>>> have things working with an older version of Flink - but it would be good
>>> to know what's changed recently that's causing the restore to break and if
>>> my job is not going to be compatible with future releases of Flink.
>>>
>>> Best,
>>> Josh
>>>
>>> On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi Josh!
>>>>
>>>> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
>>>> now, in order to add the elasticity feature (change parallelism or running
>>>> jobs and still maintaining exactly once guarantees).
>>>> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We
>>>> will try and add compatibility towards 1.1 savepoints before the release of
>>>> version 1.2.
>>>>
>>>> I think the exception is probably caused by the fact that old savepoint
>>>> stored some serialized user code (the new one is not expected to) which
>>>> cannot be loaded.
>>>>
>>>> Adding Aljoscha and Stefan to this, to see if they can add anything.
>>>> In any case, this should have a much better error message.
>>>>
>>>> I hope you understand that the 1.2-SNAPSHOT version is in some parts
>>>> WIP, so not really recommended for general use.
>>>>
>>>> Does version 1.1 not work for you?
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 21, 2016 at 7:44 PM, Josh <jo...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a Flink job which uses the RocksDBStateBackend, which has been
>>>>> running on a Flink 1.0 cluster.
>>>>>
>>>>> The job is written in Scala, and I previously made some changes to the
>>>>> job to ensure that state could be restored. For example, whenever I call
>>>>> `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
>>>>> MyCustomFlatMapper())` instead of an anonymous function.
>>>>>
>>>>> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer
>>>>> able to restore state. I'm seeing exceptions which look like this when
>>>>> trying to restore from a savepoint:
>>>>>
>>>>> java.lang.RuntimeException: Could not initialize keyed state backend.
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>>> tor.open(AbstractStreamOperator.java:148)
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
>>>>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>>>>> ckend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDB
>>>>> KeyedStateBackend.java:653)
>>>>>
>>>>> I'm not passing any anonymous functions to `map` or `flatMap` on Flink
>>>>> DataStreams, so it looks like this exception is caused just from using
>>>>> Scala functions like `filter`, `map`, `flatMap` on standard Scala
>>>>> collections, within my class `MyCustomFlatMapper`.
>>>>>
>>>>> Are there any changes to the way Flink state is restored or to
>>>>> RocksDBStateBackend, in the last 2-3 months, which could cause this to
>>>>> happen?
>>>>>
>>>>> If so, any advice on fixing it?
>>>>>
>>>>> I'm hoping there's a better solution to this than rewriting the Flink
>>>>> job in Java.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Josh
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

Posted by Till Rohrmann <tr...@apache.org>.
Hi Josh,

the internal state representation of Kafka sources has been changed
recently so that it is now possible to rescale the Kafka sources. That is
the reason why the old savepoint which contains the Kafka state in the old
representation is not able to be read by the updated Kafka sources.

The backwards compatibility feature for the Kafka source is still pending
to be implemented. In the final 1.2 release this should be added. Sorry for
the inconveniences.

Cheers,
Till

On Mon, Oct 3, 2016 at 1:37 PM, Josh <jo...@gmail.com> wrote:

> Hi Stefan,
>
> Sorry for the late reply - I was away last week.
> I've just got round to retrying my above scenario (run my job, take a
> savepoint, restore my job) using the latest Flink 1.2-SNAPSHOT  -- and am
> now seeing a different exception when restoring the state:
>
> 10/03/2016 11:29:02 Job execution switched to status FAILING.
> java.lang.RuntimeException: Could not deserialize NFA.
> at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.
> deserialize(JavaSerializer.java:86)
> at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.
> deserialize(JavaSerializer.java:31)
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend.g
> etPartitionableState(DefaultOperatorStateBackend.java:107)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum
> erBase.initializeState(FlinkKafkaConsumerBase.java:323)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
> erator.open(AbstractUdfStreamOperator.java:105)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
> perators(StreamTask.java:396)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:269)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
>
> Any ideas what's going on here? Is the Kafka consumer state management
> broken right now in Flink master?
>
> Thanks,
> Josh
>
>
> On Thu, Sep 22, 2016 at 9:28 AM, Stefan Richter <
> s.richter@data-artisans.com> wrote:
>
>> Hi,
>>
>> to me, this looks like you are running into the problem described under
>> [FLINK-4603] : KeyedStateBackend cannot restore user code classes. I have
>> opened a pull request (PR 2533) this morning that should fix this behavior
>> as soon as it is merged into master.
>>
>> Best,
>> Stefan
>>
>> Am 21.09.2016 um 23:49 schrieb Josh <jo...@gmail.com>:
>>
>> Hi Stephan,
>>
>> Thanks for the reply. I should have been a bit clearer but actually I was
>> not trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from
>> scratch (starting with no state), then took a savepoint and tried to
>> restart it from the savepoint - and that's when I get this exception. If I
>> do this with the same job using an older version of Flink (1.1-SNAPSHOT
>> taken in June), the savepoint and restore works fine.
>>
>> I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use
>> (improvements to Kinesis connector + the bucketing sink). Anyway for now I
>> have things working with an older version of Flink - but it would be good
>> to know what's changed recently that's causing the restore to break and if
>> my job is not going to be compatible with future releases of Flink.
>>
>> Best,
>> Josh
>>
>> On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi Josh!
>>>
>>> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
>>> now, in order to add the elasticity feature (change parallelism or running
>>> jobs and still maintaining exactly once guarantees).
>>> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We
>>> will try and add compatibility towards 1.1 savepoints before the release of
>>> version 1.2.
>>>
>>> I think the exception is probably caused by the fact that old savepoint
>>> stored some serialized user code (the new one is not expected to) which
>>> cannot be loaded.
>>>
>>> Adding Aljoscha and Stefan to this, to see if they can add anything.
>>> In any case, this should have a much better error message.
>>>
>>> I hope you understand that the 1.2-SNAPSHOT version is in some parts
>>> WIP, so not really recommended for general use.
>>>
>>> Does version 1.1 not work for you?
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Sep 21, 2016 at 7:44 PM, Josh <jo...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a Flink job which uses the RocksDBStateBackend, which has been
>>>> running on a Flink 1.0 cluster.
>>>>
>>>> The job is written in Scala, and I previously made some changes to the
>>>> job to ensure that state could be restored. For example, whenever I call
>>>> `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
>>>> MyCustomFlatMapper())` instead of an anonymous function.
>>>>
>>>> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer
>>>> able to restore state. I'm seeing exceptions which look like this when
>>>> trying to restore from a savepoint:
>>>>
>>>> java.lang.RuntimeException: Could not initialize keyed state backend.
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>> tor.open(AbstractStreamOperator.java:148)
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
>>>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>>>> ckend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDB
>>>> KeyedStateBackend.java:653)
>>>>
>>>> I'm not passing any anonymous functions to `map` or `flatMap` on Flink
>>>> DataStreams, so it looks like this exception is caused just from using
>>>> Scala functions like `filter`, `map`, `flatMap` on standard Scala
>>>> collections, within my class `MyCustomFlatMapper`.
>>>>
>>>> Are there any changes to the way Flink state is restored or to
>>>> RocksDBStateBackend, in the last 2-3 months, which could cause this to
>>>> happen?
>>>>
>>>> If so, any advice on fixing it?
>>>>
>>>> I'm hoping there's a better solution to this than rewriting the Flink
>>>> job in Java.
>>>>
>>>> Thanks,
>>>>
>>>> Josh
>>>>
>>>
>>>
>>
>>
>

Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

Posted by Josh <jo...@gmail.com>.
Hi Stefan,

Sorry for the late reply - I was away last week.
I've just got round to retrying my above scenario (run my job, take a
savepoint, restore my job) using the latest Flink 1.2-SNAPSHOT  -- and am
now seeing a different exception when restoring the state:

10/03/2016 11:29:02 Job execution switched to status FAILING.
java.lang.RuntimeException: Could not deserialize NFA.
at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(
JavaSerializer.java:86)
at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(
JavaSerializer.java:31)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.
getPartitionableState(DefaultOperatorStateBackend.java:107)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.
initializeState(FlinkKafkaConsumerBase.java:323)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
AbstractUdfStreamOperator.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.
openAllOperators(StreamTask.java:396)
at org.apache.flink.streaming.runtime.tasks.StreamTask.
invoke(StreamTask.java:269)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.
connectors.kafka.internals.KafkaTopicPartition

Any ideas what's going on here? Is the Kafka consumer state management
broken right now in Flink master?

Thanks,
Josh


On Thu, Sep 22, 2016 at 9:28 AM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> to me, this looks like you are running into the problem described under
> [FLINK-4603] : KeyedStateBackend cannot restore user code classes. I have
> opened a pull request (PR 2533) this morning that should fix this behavior
> as soon as it is merged into master.
>
> Best,
> Stefan
>
> Am 21.09.2016 um 23:49 schrieb Josh <jo...@gmail.com>:
>
> Hi Stephan,
>
> Thanks for the reply. I should have been a bit clearer but actually I was
> not trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from
> scratch (starting with no state), then took a savepoint and tried to
> restart it from the savepoint - and that's when I get this exception. If I
> do this with the same job using an older version of Flink (1.1-SNAPSHOT
> taken in June), the savepoint and restore works fine.
>
> I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use
> (improvements to Kinesis connector + the bucketing sink). Anyway for now I
> have things working with an older version of Flink - but it would be good
> to know what's changed recently that's causing the restore to break and if
> my job is not going to be compatible with future releases of Flink.
>
> Best,
> Josh
>
> On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Josh!
>>
>> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
>> now, in order to add the elasticity feature (change parallelism or running
>> jobs and still maintaining exactly once guarantees).
>> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will
>> try and add compatibility towards 1.1 savepoints before the release of
>> version 1.2.
>>
>> I think the exception is probably caused by the fact that old savepoint
>> stored some serialized user code (the new one is not expected to) which
>> cannot be loaded.
>>
>> Adding Aljoscha and Stefan to this, to see if they can add anything.
>> In any case, this should have a much better error message.
>>
>> I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP,
>> so not really recommended for general use.
>>
>> Does version 1.1 not work for you?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Sep 21, 2016 at 7:44 PM, Josh <jo...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a Flink job which uses the RocksDBStateBackend, which has been
>>> running on a Flink 1.0 cluster.
>>>
>>> The job is written in Scala, and I previously made some changes to the
>>> job to ensure that state could be restored. For example, whenever I call
>>> `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
>>> MyCustomFlatMapper())` instead of an anonymous function.
>>>
>>> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able
>>> to restore state. I'm seeing exceptions which look like this when trying to
>>> restore from a savepoint:
>>>
>>> java.lang.RuntimeException: Could not initialize keyed state backend.
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor.open(AbstractStreamOperator.java:148)
>>> Caused by: java.lang.ClassNotFoundException:
>>> com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
>>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>>> ckend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDB
>>> KeyedStateBackend.java:653)
>>>
>>> I'm not passing any anonymous functions to `map` or `flatMap` on Flink
>>> DataStreams, so it looks like this exception is caused just from using
>>> Scala functions like `filter`, `map`, `flatMap` on standard Scala
>>> collections, within my class `MyCustomFlatMapper`.
>>>
>>> Are there any changes to the way Flink state is restored or to
>>> RocksDBStateBackend, in the last 2-3 months, which could cause this to
>>> happen?
>>>
>>> If so, any advice on fixing it?
>>>
>>> I'm hoping there's a better solution to this than rewriting the Flink
>>> job in Java.
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>
>>
>
>

Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

to me, this looks like you are running into the problem described under [FLINK-4603] : KeyedStateBackend cannot restore user code classes. I have opened a pull request (PR 2533) this morning that should fix this behavior as soon as it is merged into master.

Best,
Stefan

> Am 21.09.2016 um 23:49 schrieb Josh <jo...@gmail.com>:
> 
> Hi Stephan,
> 
> Thanks for the reply. I should have been a bit clearer but actually I was not trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from scratch (starting with no state), then took a savepoint and tried to restart it from the savepoint - and that's when I get this exception. If I do this with the same job using an older version of Flink (1.1-SNAPSHOT taken in June), the savepoint and restore works fine.
> 
> I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use (improvements to Kinesis connector + the bucketing sink). Anyway for now I have things working with an older version of Flink - but it would be good to know what's changed recently that's causing the restore to break and if my job is not going to be compatible with future releases of Flink.
> 
> Best,
> Josh
> 
> On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <sewen@apache.org <ma...@apache.org>> wrote:
> Hi Josh!
> 
> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right now, in order to add the elasticity feature (change parallelism or running jobs and still maintaining exactly once guarantees).
> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will try and add compatibility towards 1.1 savepoints before the release of version 1.2.
> 
> I think the exception is probably caused by the fact that old savepoint stored some serialized user code (the new one is not expected to) which cannot be loaded.
> 
> Adding Aljoscha and Stefan to this, to see if they can add anything.
> In any case, this should have a much better error message.
> 
> I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP, so not really recommended for general use.
> 
> Does version 1.1 not work for you?
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Sep 21, 2016 at 7:44 PM, Josh <jofo90@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> I have a Flink job which uses the RocksDBStateBackend, which has been running on a Flink 1.0 cluster. 
> 
> The job is written in Scala, and I previously made some changes to the job to ensure that state could be restored. For example, whenever I call `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new MyCustomFlatMapper())` instead of an anonymous function.
> 
> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able to restore state. I'm seeing exceptions which look like this when trying to restore from a savepoint:
> 
> java.lang.RuntimeException: Could not initialize keyed state backend. 
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(AbstractStreamOperator.java:148)
> Caused by: java.lang.ClassNotFoundException: com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:653)
> 
> I'm not passing any anonymous functions to `map` or `flatMap` on Flink DataStreams, so it looks like this exception is caused just from using Scala functions like `filter`, `map`, `flatMap` on standard Scala collections, within my class `MyCustomFlatMapper`.
> 
> Are there any changes to the way Flink state is restored or to RocksDBStateBackend, in the last 2-3 months, which could cause this to happen?
> If so, any advice on fixing it? 
> 
> I'm hoping there's a better solution to this than rewriting the Flink job in Java.
> 
> Thanks,
> 
> Josh
> 
> 
> 


Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

Posted by Josh <jo...@gmail.com>.
Hi Stephan,

Thanks for the reply. I should have been a bit clearer but actually I was
not trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from
scratch (starting with no state), then took a savepoint and tried to
restart it from the savepoint - and that's when I get this exception. If I
do this with the same job using an older version of Flink (1.1-SNAPSHOT
taken in June), the savepoint and restore works fine.

I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use
(improvements to Kinesis connector + the bucketing sink). Anyway for now I
have things working with an older version of Flink - but it would be good
to know what's changed recently that's causing the restore to break and if
my job is not going to be compatible with future releases of Flink.

Best,
Josh

On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Josh!
>
> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
> now, in order to add the elasticity feature (change parallelism or running
> jobs and still maintaining exactly once guarantees).
> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will
> try and add compatibility towards 1.1 savepoints before the release of
> version 1.2.
>
> I think the exception is probably caused by the fact that old savepoint
> stored some serialized user code (the new one is not expected to) which
> cannot be loaded.
>
> Adding Aljoscha and Stefan to this, to see if they can add anything.
> In any case, this should have a much better error message.
>
> I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP,
> so not really recommended for general use.
>
> Does version 1.1 not work for you?
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 21, 2016 at 7:44 PM, Josh <jo...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a Flink job which uses the RocksDBStateBackend, which has been
>> running on a Flink 1.0 cluster.
>>
>> The job is written in Scala, and I previously made some changes to the
>> job to ensure that state could be restored. For example, whenever I call
>> `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
>> MyCustomFlatMapper())` instead of an anonymous function.
>>
>> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able
>> to restore state. I'm seeing exceptions which look like this when trying to
>> restore from a savepoint:
>>
>> java.lang.RuntimeException: Could not initialize keyed state backend.
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.open(AbstractStreamOperator.java:148)
>> Caused by: java.lang.ClassNotFoundException:
>> com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDB
>> KeyedStateBackend.java:653)
>>
>> I'm not passing any anonymous functions to `map` or `flatMap` on Flink
>> DataStreams, so it looks like this exception is caused just from using
>> Scala functions like `filter`, `map`, `flatMap` on standard Scala
>> collections, within my class `MyCustomFlatMapper`.
>>
>> Are there any changes to the way Flink state is restored or to
>> RocksDBStateBackend, in the last 2-3 months, which could cause this to
>> happen?
>>
>> If so, any advice on fixing it?
>>
>> I'm hoping there's a better solution to this than rewriting the Flink job
>> in Java.
>>
>> Thanks,
>>
>> Josh
>>
>
>

Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

Posted by Stephan Ewen <se...@apache.org>.
Hi Josh!

The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
now, in order to add the elasticity feature (change parallelism or running
jobs and still maintaining exactly once guarantees).
At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will
try and add compatibility towards 1.1 savepoints before the release of
version 1.2.

I think the exception is probably caused by the fact that old savepoint
stored some serialized user code (the new one is not expected to) which
cannot be loaded.

Adding Aljoscha and Stefan to this, to see if they can add anything.
In any case, this should have a much better error message.

I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP,
so not really recommended for general use.

Does version 1.1 not work for you?

Greetings,
Stephan


On Wed, Sep 21, 2016 at 7:44 PM, Josh <jo...@gmail.com> wrote:

> Hi,
>
> I have a Flink job which uses the RocksDBStateBackend, which has been
> running on a Flink 1.0 cluster.
>
> The job is written in Scala, and I previously made some changes to the job
> to ensure that state could be restored. For example, whenever I call `map`
> or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
> MyCustomFlatMapper())` instead of an anonymous function.
>
> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able
> to restore state. I'm seeing exceptions which look like this when trying to
> restore from a savepoint:
>
> java.lang.RuntimeException: Could not initialize keyed state backend.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(
> AbstractStreamOperator.java:148)
> Caused by: java.lang.ClassNotFoundException: com.joshfg.flink.job.MyJob$
> MyCustomFlatMapper$$anon$4$$anon$2
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBRestoreOperation.restoreKVStateMetaData(
> RocksDBKeyedStateBackend.java:653)
>
> I'm not passing any anonymous functions to `map` or `flatMap` on Flink
> DataStreams, so it looks like this exception is caused just from using
> Scala functions like `filter`, `map`, `flatMap` on standard Scala
> collections, within my class `MyCustomFlatMapper`.
>
> Are there any changes to the way Flink state is restored or to
> RocksDBStateBackend, in the last 2-3 months, which could cause this to
> happen?
>
> If so, any advice on fixing it?
>
> I'm hoping there's a better solution to this than rewriting the Flink job
> in Java.
>
> Thanks,
>
> Josh
>