You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Josh <jo...@gmail.com> on 2016/06/29 19:00:13 UTC

How to avoid breaking states when upgrading Flink job?

Hi all,
Is there any information out there on how to avoid breaking saved
states/savepoints when making changes to a Flink job and redeploying it?

I want to know how to avoid exceptions like this:

java.lang.RuntimeException: Failed to deserialize state handle and
setup initial operator state.
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
com.me.flink.MyJob$$anon$1$$anon$7$$anon$4


The best information I could find in the docs is here:

https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html


Having made the suggested changes to my job (i.e. giving a uid to
every stateful sink and map function), what changes to the
job/topology are then allowed/not allowed?


If I'm 'naming' my states by providing uids, why does Flink need to
look for a specific class, like
com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?


Thanks for any advice,

Josh

Re: How to avoid breaking states when upgrading Flink job?

Posted by Ufuk Celebi <uc...@apache.org>.
Judging from the stack trace the state should be part of the operator
state and not the partitioned RocksDB state. If you have implemented
the Checkpointed interface anywhere, that would be a good place to
pinpoint the anonymous class. Is it possible to share the job code?

– Ufuk

On Fri, Jul 1, 2016 at 6:01 PM, Aljoscha Krettek <al...@apache.org> wrote:
> Ah, this might be in code that runs at a different layer from the
> StateBackend. Can you maybe pinpoint which of your user classes is this
> anonymous class and where it is used? Maybe by replacing them by
> non-anonymous classes and checking which replacement fixes the problem.
>
> -
> Aljoscha
>
> On Fri, 1 Jul 2016 at 16:27 Josh <jo...@gmail.com> wrote:
>>
>> I've just double checked and I do still get the ClassNotFound error for an
>> anonymous class, on a job which uses the RocksDBStateBackend.
>>
>> In case it helps, this was the full stack trace:
>>
>> java.lang.RuntimeException: Failed to deserialize state handle and setup
>> initial operator state.
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>> 	at java.lang.Thread.run(Thread.
>>
>> java:745)
>> Caused by: java.lang.ClassNotFoundException:
>> com.me.flink.job.MyJob$$anon$1$$anon$4$$anon$3
>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> 	at java.lang.Class.forName0(Native Method)
>> 	at java.lang.Class.forName(Class.java:348)
>> 	at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>> 	at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
>> 	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
>> 	at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> 	at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>> 	at
>> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
>> 	at
>> org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 	at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 	at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>> 	at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>> 	at java.util.ArrayList.readObject(ArrayList.java:791)
>> 	at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
>> 	at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>> 	at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> 	at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>> 	at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>> 	at java.util.HashMap.readObject(HashMap.java:1396)
>> 	at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
>> 	at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>> 	at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> 	at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>> 	at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> 	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> 	at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>> 	at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>> 	at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
>> 	at
>> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>
>>
>> On Fri, Jul 1, 2016 at 10:21 AM, Josh <jo...@gmail.com> wrote:
>>>
>>> Thanks guys, that's very helpful info!
>>>
>>> @Aljoscha I thought I saw this exception on a job that was using the
>>> RocksDB state backend, but I'm not sure. I will do some more tests today to
>>> double check. If it's still a problem I'll try the explicit class
>>> definitions solution.
>>>
>>> Josh
>>>
>>>
>>> On Thu, Jun 30, 2016 at 1:27 PM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>>
>>>> Also, you're using the FsStateBackend, correct?
>>>>
>>>> Reason I'm asking is that the problem should not occur for the RocksDB
>>>> state backend. There, we don't serialize any user code, only binary data. A
>>>> while back I wanted to change the FsStateBackend to also work like this. Now
>>>> might be a good time to actually do this. :-)
>>>>
>>>> On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <tr...@apache.org> wrote:
>>>>>
>>>>> Hi Josh,
>>>>>
>>>>> you could also try to replace your anonymous classes by explicit class
>>>>> definitions. This should assign these classes a fixed name independent of
>>>>> the other anonymous classes. Then the class loader should be able to
>>>>> deserialize your serialized data.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <al...@apache.org>
>>>>> wrote:
>>>>>>
>>>>>> Hi Josh,
>>>>>> I think in your case the problem is that Scala might choose different
>>>>>> names for synthetic/generated classes. This will trip up the code that is
>>>>>> trying to restore from a snapshot that was done with an earlier version of
>>>>>> the code where classes where named differently.
>>>>>>
>>>>>> I'm afraid I don't know how to solve this one right now, except by
>>>>>> switching to Java.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <mx...@apache.org>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Josh,
>>>>>>>
>>>>>>> You have to assign UIDs to all operators to change the topology.
>>>>>>> Plus,
>>>>>>> you have to add dummy operators for all UIDs which you removed; this
>>>>>>> is a limitation currently because Flink will attempt to find all UIDs
>>>>>>> of the old job.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jo...@gmail.com> wrote:
>>>>>>> > Hi all,
>>>>>>> > Is there any information out there on how to avoid breaking saved
>>>>>>> > states/savepoints when making changes to a Flink job and
>>>>>>> > redeploying it?
>>>>>>> >
>>>>>>> > I want to know how to avoid exceptions like this:
>>>>>>> >
>>>>>>> > java.lang.RuntimeException: Failed to deserialize state handle and
>>>>>>> > setup
>>>>>>> > initial operator state.
>>>>>>> >       at
>>>>>>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>>>>>>> >       at java.lang.Thread.run(Thread.java:745)
>>>>>>> > Caused by: java.lang.ClassNotFoundException:
>>>>>>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>>>>>>> >
>>>>>>> >
>>>>>>> > The best information I could find in the docs is here:
>>>>>>> >
>>>>>>> >
>>>>>>> > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>>>>>>> >
>>>>>>> >
>>>>>>> > Having made the suggested changes to my job (i.e. giving a uid to
>>>>>>> > every
>>>>>>> > stateful sink and map function), what changes to the job/topology
>>>>>>> > are then
>>>>>>> > allowed/not allowed?
>>>>>>> >
>>>>>>> >
>>>>>>> > If I'm 'naming' my states by providing uids, why does Flink need to
>>>>>>> > look for
>>>>>>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>>>>>>> >
>>>>>>> >
>>>>>>> > Thanks for any advice,
>>>>>>> >
>>>>>>> > Josh
>>>>>
>>>>>
>>>
>>
>

Re: How to avoid breaking states when upgrading Flink job?

Posted by Aljoscha Krettek <al...@apache.org>.
Ah, this might be in code that runs at a different layer from the
StateBackend. Can you maybe pinpoint which of your user classes is this
anonymous class and where it is used? Maybe by replacing them by
non-anonymous classes and checking which replacement fixes the problem.

-
Aljoscha

On Fri, 1 Jul 2016 at 16:27 Josh <jo...@gmail.com> wrote:

> I've just double checked and I do still get the ClassNotFound error for an
> anonymous class, on a job which uses the RocksDBStateBackend.
>
> In case it helps, this was the full stack trace:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
> 	at java.lang.Thread.run(Thread.
>
> java:745)
> Caused by: java.lang.ClassNotFoundException: com.me.flink.job.MyJob$$anon$1$$anon$4$$anon$3
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:348)
> 	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
> 	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
> 	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> 	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
> 	at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> 	at java.util.ArrayList.readObject(ArrayList.java:791)
> 	at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> 	at java.util.HashMap.readObject(HashMap.java:1396)
> 	at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> 	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> 	at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
>
> On Fri, Jul 1, 2016 at 10:21 AM, Josh <jo...@gmail.com> wrote:
>
>> Thanks guys, that's very helpful info!
>>
>> @Aljoscha I thought I saw this exception on a job that was using the
>> RocksDB state backend, but I'm not sure. I will do some more tests today to
>> double check. If it's still a problem I'll try the explicit class
>> definitions solution.
>>
>> Josh
>>
>> On Thu, Jun 30, 2016 at 1:27 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Also, you're using the FsStateBackend, correct?
>>>
>>> Reason I'm asking is that the problem should not occur for the RocksDB
>>> state backend. There, we don't serialize any user code, only binary data. A
>>> while back I wanted to change the FsStateBackend to also work like this.
>>> Now might be a good time to actually do this. :-)
>>>
>>> On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <tr...@apache.org> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>> you could also try to replace your anonymous classes by explicit class
>>>> definitions. This should assign these classes a fixed name independent of
>>>> the other anonymous classes. Then the class loader should be able to
>>>> deserialize your serialized data.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Josh,
>>>>> I think in your case the problem is that Scala might choose different
>>>>> names for synthetic/generated classes. This will trip up the code that is
>>>>> trying to restore from a snapshot that was done with an earlier version of
>>>>> the code where classes where named differently.
>>>>>
>>>>> I'm afraid I don't know how to solve this one right now, except by
>>>>> switching to Java.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Josh,
>>>>>>
>>>>>> You have to assign UIDs to all operators to change the topology. Plus,
>>>>>> you have to add dummy operators for all UIDs which you removed; this
>>>>>> is a limitation currently because Flink will attempt to find all UIDs
>>>>>> of the old job.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jo...@gmail.com> wrote:
>>>>>> > Hi all,
>>>>>> > Is there any information out there on how to avoid breaking saved
>>>>>> > states/savepoints when making changes to a Flink job and
>>>>>> redeploying it?
>>>>>> >
>>>>>> > I want to know how to avoid exceptions like this:
>>>>>> >
>>>>>> > java.lang.RuntimeException: Failed to deserialize state handle and
>>>>>> setup
>>>>>> > initial operator state.
>>>>>> >       at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>>>>>> >       at java.lang.Thread.run(Thread.java:745)
>>>>>> > Caused by: java.lang.ClassNotFoundException:
>>>>>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>>>>>> >
>>>>>> >
>>>>>> > The best information I could find in the docs is here:
>>>>>> >
>>>>>> >
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>>>>>> >
>>>>>> >
>>>>>> > Having made the suggested changes to my job (i.e. giving a uid to
>>>>>> every
>>>>>> > stateful sink and map function), what changes to the job/topology
>>>>>> are then
>>>>>> > allowed/not allowed?
>>>>>> >
>>>>>> >
>>>>>> > If I'm 'naming' my states by providing uids, why does Flink need to
>>>>>> look for
>>>>>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>>>>>> >
>>>>>> >
>>>>>> > Thanks for any advice,
>>>>>> >
>>>>>> > Josh
>>>>>>
>>>>>
>>>>
>>
>

Re: How to avoid breaking states when upgrading Flink job?

Posted by Josh <jo...@gmail.com>.
I've just double checked and I do still get the ClassNotFound error for an
anonymous class, on a job which uses the RocksDBStateBackend.

In case it helps, this was the full stack trace:

java.lang.RuntimeException: Failed to deserialize state handle and
setup initial operator state.
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
com.me.flink.job.MyJob$$anon$1$$anon$4$$anon$3
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
	at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at java.util.ArrayList.readObject(ArrayList.java:791)
	at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at java.util.HashMap.readObject(HashMap.java:1396)
	at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
	at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)


On Fri, Jul 1, 2016 at 10:21 AM, Josh <jo...@gmail.com> wrote:

> Thanks guys, that's very helpful info!
>
> @Aljoscha I thought I saw this exception on a job that was using the
> RocksDB state backend, but I'm not sure. I will do some more tests today to
> double check. If it's still a problem I'll try the explicit class
> definitions solution.
>
> Josh
>
> On Thu, Jun 30, 2016 at 1:27 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Also, you're using the FsStateBackend, correct?
>>
>> Reason I'm asking is that the problem should not occur for the RocksDB
>> state backend. There, we don't serialize any user code, only binary data. A
>> while back I wanted to change the FsStateBackend to also work like this.
>> Now might be a good time to actually do this. :-)
>>
>> On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <tr...@apache.org> wrote:
>>
>>> Hi Josh,
>>>
>>> you could also try to replace your anonymous classes by explicit class
>>> definitions. This should assign these classes a fixed name independent of
>>> the other anonymous classes. Then the class loader should be able to
>>> deserialize your serialized data.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi Josh,
>>>> I think in your case the problem is that Scala might choose different
>>>> names for synthetic/generated classes. This will trip up the code that is
>>>> trying to restore from a snapshot that was done with an earlier version of
>>>> the code where classes where named differently.
>>>>
>>>> I'm afraid I don't know how to solve this one right now, except by
>>>> switching to Java.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <mx...@apache.org> wrote:
>>>>
>>>>> Hi Josh,
>>>>>
>>>>> You have to assign UIDs to all operators to change the topology. Plus,
>>>>> you have to add dummy operators for all UIDs which you removed; this
>>>>> is a limitation currently because Flink will attempt to find all UIDs
>>>>> of the old job.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jo...@gmail.com> wrote:
>>>>> > Hi all,
>>>>> > Is there any information out there on how to avoid breaking saved
>>>>> > states/savepoints when making changes to a Flink job and redeploying
>>>>> it?
>>>>> >
>>>>> > I want to know how to avoid exceptions like this:
>>>>> >
>>>>> > java.lang.RuntimeException: Failed to deserialize state handle and
>>>>> setup
>>>>> > initial operator state.
>>>>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>>>>> >       at java.lang.Thread.run(Thread.java:745)
>>>>> > Caused by: java.lang.ClassNotFoundException:
>>>>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>>>>> >
>>>>> >
>>>>> > The best information I could find in the docs is here:
>>>>> >
>>>>> >
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>>>>> >
>>>>> >
>>>>> > Having made the suggested changes to my job (i.e. giving a uid to
>>>>> every
>>>>> > stateful sink and map function), what changes to the job/topology
>>>>> are then
>>>>> > allowed/not allowed?
>>>>> >
>>>>> >
>>>>> > If I'm 'naming' my states by providing uids, why does Flink need to
>>>>> look for
>>>>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>>>>> >
>>>>> >
>>>>> > Thanks for any advice,
>>>>> >
>>>>> > Josh
>>>>>
>>>>
>>>
>

Re: How to avoid breaking states when upgrading Flink job?

Posted by Josh <jo...@gmail.com>.
Thanks guys, that's very helpful info!

@Aljoscha I thought I saw this exception on a job that was using the
RocksDB state backend, but I'm not sure. I will do some more tests today to
double check. If it's still a problem I'll try the explicit class
definitions solution.

Josh

On Thu, Jun 30, 2016 at 1:27 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Also, you're using the FsStateBackend, correct?
>
> Reason I'm asking is that the problem should not occur for the RocksDB
> state backend. There, we don't serialize any user code, only binary data. A
> while back I wanted to change the FsStateBackend to also work like this.
> Now might be a good time to actually do this. :-)
>
> On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <tr...@apache.org> wrote:
>
>> Hi Josh,
>>
>> you could also try to replace your anonymous classes by explicit class
>> definitions. This should assign these classes a fixed name independent of
>> the other anonymous classes. Then the class loader should be able to
>> deserialize your serialized data.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi Josh,
>>> I think in your case the problem is that Scala might choose different
>>> names for synthetic/generated classes. This will trip up the code that is
>>> trying to restore from a snapshot that was done with an earlier version of
>>> the code where classes where named differently.
>>>
>>> I'm afraid I don't know how to solve this one right now, except by
>>> switching to Java.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <mx...@apache.org> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>> You have to assign UIDs to all operators to change the topology. Plus,
>>>> you have to add dummy operators for all UIDs which you removed; this
>>>> is a limitation currently because Flink will attempt to find all UIDs
>>>> of the old job.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jo...@gmail.com> wrote:
>>>> > Hi all,
>>>> > Is there any information out there on how to avoid breaking saved
>>>> > states/savepoints when making changes to a Flink job and redeploying
>>>> it?
>>>> >
>>>> > I want to know how to avoid exceptions like this:
>>>> >
>>>> > java.lang.RuntimeException: Failed to deserialize state handle and
>>>> setup
>>>> > initial operator state.
>>>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>>>> >       at java.lang.Thread.run(Thread.java:745)
>>>> > Caused by: java.lang.ClassNotFoundException:
>>>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>>>> >
>>>> >
>>>> > The best information I could find in the docs is here:
>>>> >
>>>> >
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>>>> >
>>>> >
>>>> > Having made the suggested changes to my job (i.e. giving a uid to
>>>> every
>>>> > stateful sink and map function), what changes to the job/topology are
>>>> then
>>>> > allowed/not allowed?
>>>> >
>>>> >
>>>> > If I'm 'naming' my states by providing uids, why does Flink need to
>>>> look for
>>>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>>>> >
>>>> >
>>>> > Thanks for any advice,
>>>> >
>>>> > Josh
>>>>
>>>
>>

Re: Accessing external DB inside RichFlatMap Function

Posted by Kostas Kloudas <k....@data-artisans.com>.
Yes, Chesnay is right. You can open and close the connection in 
the open() and close() methods of your RichFlatMapFunction.

Kostas

> On Jul 7, 2016, at 11:03 AM, Chesnay Schepler <ch...@apache.org> wrote:
> 
> Couldn't he do the same thing in his RichFlatMap?
> 
> open the db connection in open(), close it in close(), do stuff within these calls.
> 
> On 07.07.2016 10:58, Kostas Kloudas wrote:
>> Hi Simon,
>> 
>> If your job reads or writes to a DB, I would suggest to use one of the already existing Flink sources or sinks,
>> as this allows for efficient connection handling and managing.
>> 
>> If you want to write intermediate data to a DB from an operator, then I suppose that you should implement
>> your own custom operator that opens a DB connection in the open() method and closes it at close().
>> If you are planning to do so, I think that code of your custom operator would be the same as the StreamFlatMap,
>> with the addition of the openDBConnection method in the open(), and you should also override the close() method
>> of the AbstractUdfStreamOperator to 1) call super.close() and 2) close the db connection.
>> 
>> Let me know if this works,
>> Kostas
>> 
>> 
>>> On Jul 7, 2016, at 10:38 AM, simon peyer <si...@soom-it.ch> wrote:
>>> 
>>> Hi guys
>>> 
>>> Is there a easy way to handle external DB connections inside a RichFlatMap Function?
>>> 
>>> --Thanks Simon
>> 
> 


Re: Accessing external DB inside RichFlatMap Function

Posted by Chesnay Schepler <ch...@apache.org>.
Couldn't he do the same thing in his RichFlatMap?

open the db connection in open(), close it in close(), do stuff within 
these calls.

On 07.07.2016 10:58, Kostas Kloudas wrote:
> Hi Simon,
>
> If your job reads or writes to a DB, I would suggest to use one of the already existing Flink sources or sinks,
> as this allows for efficient connection handling and managing.
>
> If you want to write intermediate data to a DB from an operator, then I suppose that you should implement
> your own custom operator that opens a DB connection in the open() method and closes it at close().
> If you are planning to do so, I think that code of your custom operator would be the same as the StreamFlatMap,
> with the addition of the openDBConnection method in the open(), and you should also override the close() method
> of the AbstractUdfStreamOperator to 1) call super.close() and 2) close the db connection.
>
> Let me know if this works,
> Kostas
>
>
>> On Jul 7, 2016, at 10:38 AM, simon peyer <si...@soom-it.ch> wrote:
>>
>> Hi guys
>>
>> Is there a easy way to handle external DB connections inside a RichFlatMap Function?
>>
>> --Thanks Simon
>


Re: Accessing external DB inside RichFlatMap Function

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Simon,

If your job reads or writes to a DB, I would suggest to use one of the already existing Flink sources or sinks,
as this allows for efficient connection handling and managing.

If you want to write intermediate data to a DB from an operator, then I suppose that you should implement 
your own custom operator that opens a DB connection in the open() method and closes it at close(). 
If you are planning to do so, I think that code of your custom operator would be the same as the StreamFlatMap,
with the addition of the openDBConnection method in the open(), and you should also override the close() method 
of the AbstractUdfStreamOperator to 1) call super.close() and 2) close the db connection.

Let me know if this works,
Kostas 


> On Jul 7, 2016, at 10:38 AM, simon peyer <si...@soom-it.ch> wrote:
> 
> Hi guys
> 
> Is there a easy way to handle external DB connections inside a RichFlatMap Function?
> 
> --Thanks Simon


Accessing external DB inside RichFlatMap Function

Posted by simon peyer <si...@soom-it.ch>.
Hi guys

Is there a easy way to handle external DB connections inside a RichFlatMap Function?

--Thanks Simon

Re: How to avoid breaking states when upgrading Flink job?

Posted by Aljoscha Krettek <al...@apache.org>.
Also, you're using the FsStateBackend, correct?

Reason I'm asking is that the problem should not occur for the RocksDB
state backend. There, we don't serialize any user code, only binary data. A
while back I wanted to change the FsStateBackend to also work like this.
Now might be a good time to actually do this. :-)

On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <tr...@apache.org> wrote:

> Hi Josh,
>
> you could also try to replace your anonymous classes by explicit class
> definitions. This should assign these classes a fixed name independent of
> the other anonymous classes. Then the class loader should be able to
> deserialize your serialized data.
>
> Cheers,
> Till
>
> On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi Josh,
>> I think in your case the problem is that Scala might choose different
>> names for synthetic/generated classes. This will trip up the code that is
>> trying to restore from a snapshot that was done with an earlier version of
>> the code where classes where named differently.
>>
>> I'm afraid I don't know how to solve this one right now, except by
>> switching to Java.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <mx...@apache.org> wrote:
>>
>>> Hi Josh,
>>>
>>> You have to assign UIDs to all operators to change the topology. Plus,
>>> you have to add dummy operators for all UIDs which you removed; this
>>> is a limitation currently because Flink will attempt to find all UIDs
>>> of the old job.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jo...@gmail.com> wrote:
>>> > Hi all,
>>> > Is there any information out there on how to avoid breaking saved
>>> > states/savepoints when making changes to a Flink job and redeploying
>>> it?
>>> >
>>> > I want to know how to avoid exceptions like this:
>>> >
>>> > java.lang.RuntimeException: Failed to deserialize state handle and
>>> setup
>>> > initial operator state.
>>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>>> >       at java.lang.Thread.run(Thread.java:745)
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>>> >
>>> >
>>> > The best information I could find in the docs is here:
>>> >
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>>> >
>>> >
>>> > Having made the suggested changes to my job (i.e. giving a uid to every
>>> > stateful sink and map function), what changes to the job/topology are
>>> then
>>> > allowed/not allowed?
>>> >
>>> >
>>> > If I'm 'naming' my states by providing uids, why does Flink need to
>>> look for
>>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>>> >
>>> >
>>> > Thanks for any advice,
>>> >
>>> > Josh
>>>
>>
>

Re: How to avoid breaking states when upgrading Flink job?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Josh,

you could also try to replace your anonymous classes by explicit class
definitions. This should assign these classes a fixed name independent of
the other anonymous classes. Then the class loader should be able to
deserialize your serialized data.

Cheers,
Till

On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Josh,
> I think in your case the problem is that Scala might choose different
> names for synthetic/generated classes. This will trip up the code that is
> trying to restore from a snapshot that was done with an earlier version of
> the code where classes where named differently.
>
> I'm afraid I don't know how to solve this one right now, except by
> switching to Java.
>
> Cheers,
> Aljoscha
>
> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <mx...@apache.org> wrote:
>
>> Hi Josh,
>>
>> You have to assign UIDs to all operators to change the topology. Plus,
>> you have to add dummy operators for all UIDs which you removed; this
>> is a limitation currently because Flink will attempt to find all UIDs
>> of the old job.
>>
>> Cheers,
>> Max
>>
>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jo...@gmail.com> wrote:
>> > Hi all,
>> > Is there any information out there on how to avoid breaking saved
>> > states/savepoints when making changes to a Flink job and redeploying it?
>> >
>> > I want to know how to avoid exceptions like this:
>> >
>> > java.lang.RuntimeException: Failed to deserialize state handle and setup
>> > initial operator state.
>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>> >       at java.lang.Thread.run(Thread.java:745)
>> > Caused by: java.lang.ClassNotFoundException:
>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>> >
>> >
>> > The best information I could find in the docs is here:
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>> >
>> >
>> > Having made the suggested changes to my job (i.e. giving a uid to every
>> > stateful sink and map function), what changes to the job/topology are
>> then
>> > allowed/not allowed?
>> >
>> >
>> > If I'm 'naming' my states by providing uids, why does Flink need to
>> look for
>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>> >
>> >
>> > Thanks for any advice,
>> >
>> > Josh
>>
>

Re: How to avoid breaking states when upgrading Flink job?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Josh,
I think in your case the problem is that Scala might choose different names
for synthetic/generated classes. This will trip up the code that is trying
to restore from a snapshot that was done with an earlier version of the
code where classes where named differently.

I'm afraid I don't know how to solve this one right now, except by
switching to Java.

Cheers,
Aljoscha

On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <mx...@apache.org> wrote:

> Hi Josh,
>
> You have to assign UIDs to all operators to change the topology. Plus,
> you have to add dummy operators for all UIDs which you removed; this
> is a limitation currently because Flink will attempt to find all UIDs
> of the old job.
>
> Cheers,
> Max
>
> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jo...@gmail.com> wrote:
> > Hi all,
> > Is there any information out there on how to avoid breaking saved
> > states/savepoints when making changes to a Flink job and redeploying it?
> >
> > I want to know how to avoid exceptions like this:
> >
> > java.lang.RuntimeException: Failed to deserialize state handle and setup
> > initial operator state.
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
> >       at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.ClassNotFoundException:
> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
> >
> >
> > The best information I could find in the docs is here:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
> >
> >
> > Having made the suggested changes to my job (i.e. giving a uid to every
> > stateful sink and map function), what changes to the job/topology are
> then
> > allowed/not allowed?
> >
> >
> > If I'm 'naming' my states by providing uids, why does Flink need to look
> for
> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
> >
> >
> > Thanks for any advice,
> >
> > Josh
>

Re: How to avoid breaking states when upgrading Flink job?

Posted by Maximilian Michels <mx...@apache.org>.
Hi Josh,

You have to assign UIDs to all operators to change the topology. Plus,
you have to add dummy operators for all UIDs which you removed; this
is a limitation currently because Flink will attempt to find all UIDs
of the old job.

Cheers,
Max

On Wed, Jun 29, 2016 at 9:00 PM, Josh <jo...@gmail.com> wrote:
> Hi all,
> Is there any information out there on how to avoid breaking saved
> states/savepoints when making changes to a Flink job and redeploying it?
>
> I want to know how to avoid exceptions like this:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup
> initial operator state.
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>
>
> The best information I could find in the docs is here:
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>
>
> Having made the suggested changes to my job (i.e. giving a uid to every
> stateful sink and map function), what changes to the job/topology are then
> allowed/not allowed?
>
>
> If I'm 'naming' my states by providing uids, why does Flink need to look for
> a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>
>
> Thanks for any advice,
>
> Josh