You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jayant Ameta <wi...@gmail.com> on 2018/11/21 03:41:27 UTC

Store Predicate or any lambda in MapState

Hi,
I want to store a custom POJO in the MapState. One of the fields in the
object is a java.util.function.Predicate type.
Flink gives ClassNotFoundException exception on the lambda. How do I store
this object in the mapState?

Marking the predicate field as transient is an option. But in my use-case,
the predicate field is set using another library, and I don't want to call
it every time I want.


Jayant Ameta

Re: Store Predicate or any lambda in MapState

Posted by Andrey Zagrebin <an...@data-artisans.com>.
Also be aware about updating version of library (its jar).
If you rebuild code containing lambda, it can change its class name.
Upon recover, Kryo might read state with the old class name and cannot find it any more then.

I would rather save something which is easily (de)serialisable in state and transform it to what is needed (predicate etc) at the runtime.

> On 27 Nov 2018, at 17:14, Andrey Zagrebin <an...@data-artisans.com> wrote:
> 
> Hi
> 
> It can be just some dependency problem if this library, where the lambda is defined, is not on the class path of the job.
> 
> On the other hand, we might want to investigate it because Flink uses some older version 2.24.0 of kryo.
> According to this issue [1], lambda support was added to Kryo later, 
> but there is ‘com.twitter.chill.java.ClosureSerializer’ on Flink class path which could do it.
> 
> I think you could at least work around this by using a custom TypeSerializer for your state which simply uses native java serialisation.
> 
> How do you define and use state?
> Can you post a minimal job example where it fails with this exception?
> There is no much Flink code at the truncated stack trace you provided.
> 
> Best,
> Andrey
> 
> [1] https://github.com/EsotericSoftware/kryo/issues/215 <https://github.com/EsotericSoftware/kryo/issues/215>
> 
>> On 27 Nov 2018, at 06:42, Jayant Ameta <wittyameta@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Any further help on this?
>> 
>> Jayant Ameta
>> 
>> 
>> On Wed, Nov 21, 2018 at 4:37 PM Jayant Ameta <wittyameta@gmail.com <ma...@gmail.com>> wrote:
>> Here are the error logs. 
>> 
>> First error log was encountered when getting the values from the MapState.
>> 
>> java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879.1452224137
>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> 	... 22 common frames omitted
>> Wrapped by: java.lang.NoClassDefFoundError: com/test/MatcherFactory$$Lambda$879/1452224137
>> 	at sun.reflect.GeneratedSerializationConstructorAccessor282.newInstance(Unknown Source)
>> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> 	at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45)
>> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
>> 	... 17 frames truncated
>> 
>> Subsequent error logs were encountered on task manager restart (for the same job).
>> 
>> java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879/1452224137
>> 	at java.lang.Class.forName0(Class.java)
>> 	at java.lang.Class.forName(Class.java:348)
>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>> 	... 18 common frames omitted
>> Wrapped by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.test.MatcherFactory$$Lambda$879/1452224137
>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>> 	... 8 frames truncated
>> 	... 6 common frames omitted
>> Wrapped by: java.lang.IllegalStateException: Could not initialize keyed state backend.
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:292)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:224)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> 	... 2 frames truncated
>> 
>>  
>> 
>> Jayant Ameta
>> 
>> 
>> On Wed, Nov 21, 2018 at 3:17 PM Dominik Wosiński <wossyn@gmail.com <ma...@gmail.com>> wrote:
>> Hey Jayant, 
>> 
>> I don't really think that the sole fact of using Predicate should cause the ClassNotFoundException that You are talking about. The exception may come from the fact that some libraries are missing from Your cluster environment. Have You tried running the job locally to verify that the exception occurs? Also, could You please paste some logs here, they may help in determining the exact reason for the problem.
>> 
>> Best Regards,
>> Dom.
>> 
>> 
>> 
>> śr., 21 lis 2018 o 04:41 Jayant Ameta <wittyameta@gmail.com <ma...@gmail.com>> napisał(a):
>> Hi,
>> I want to store a custom POJO in the MapState. One of the fields in the object is a java.util.function.Predicate type. 
>> Flink gives ClassNotFoundException exception on the lambda. How do I store this object in the mapState?
>> 
>> Marking the predicate field as transient is an option. But in my use-case, the predicate field is set using another library, and I don't want to call it every time I want.
>> 
>> 
>> Jayant Ameta
> 


Re: Store Predicate or any lambda in MapState

Posted by Andrey Zagrebin <an...@data-artisans.com>.
Hi

It can be just some dependency problem if this library, where the lambda is defined, is not on the class path of the job.

On the other hand, we might want to investigate it because Flink uses some older version 2.24.0 of kryo.
According to this issue [1], lambda support was added to Kryo later, 
but there is ‘com.twitter.chill.java.ClosureSerializer’ on Flink class path which could do it.

I think you could at least work around this by using a custom TypeSerializer for your state which simply uses native java serialisation.

How do you define and use state?
Can you post a minimal job example where it fails with this exception?
There is no much Flink code at the truncated stack trace you provided.

Best,
Andrey

[1] https://github.com/EsotericSoftware/kryo/issues/215

> On 27 Nov 2018, at 06:42, Jayant Ameta <wi...@gmail.com> wrote:
> 
> Any further help on this?
> 
> Jayant Ameta
> 
> 
> On Wed, Nov 21, 2018 at 4:37 PM Jayant Ameta <wittyameta@gmail.com <ma...@gmail.com>> wrote:
> Here are the error logs. 
> 
> First error log was encountered when getting the values from the MapState.
> 
> java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879.1452224137
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	... 22 common frames omitted
> Wrapped by: java.lang.NoClassDefFoundError: com/test/MatcherFactory$$Lambda$879/1452224137
> 	at sun.reflect.GeneratedSerializationConstructorAccessor282.newInstance(Unknown Source)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 	at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45)
> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
> 	... 17 frames truncated
> 
> Subsequent error logs were encountered on task manager restart (for the same job).
> 
> java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879/1452224137
> 	at java.lang.Class.forName0(Class.java)
> 	at java.lang.Class.forName(Class.java:348)
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> 	... 18 common frames omitted
> Wrapped by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.test.MatcherFactory$$Lambda$879/1452224137
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
> 	... 8 frames truncated
> 	... 6 common frames omitted
> Wrapped by: java.lang.IllegalStateException: Could not initialize keyed state backend.
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:292)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:224)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	... 2 frames truncated
> 
>  
> 
> Jayant Ameta
> 
> 
> On Wed, Nov 21, 2018 at 3:17 PM Dominik Wosiński <wossyn@gmail.com <ma...@gmail.com>> wrote:
> Hey Jayant, 
> 
> I don't really think that the sole fact of using Predicate should cause the ClassNotFoundException that You are talking about. The exception may come from the fact that some libraries are missing from Your cluster environment. Have You tried running the job locally to verify that the exception occurs? Also, could You please paste some logs here, they may help in determining the exact reason for the problem.
> 
> Best Regards,
> Dom.
> 
> 
> 
> śr., 21 lis 2018 o 04:41 Jayant Ameta <wittyameta@gmail.com <ma...@gmail.com>> napisał(a):
> Hi,
> I want to store a custom POJO in the MapState. One of the fields in the object is a java.util.function.Predicate type. 
> Flink gives ClassNotFoundException exception on the lambda. How do I store this object in the mapState?
> 
> Marking the predicate field as transient is an option. But in my use-case, the predicate field is set using another library, and I don't want to call it every time I want.
> 
> 
> Jayant Ameta


Re: Store Predicate or any lambda in MapState

Posted by Jayant Ameta <wi...@gmail.com>.
Any further help on this?

Jayant Ameta


On Wed, Nov 21, 2018 at 4:37 PM Jayant Ameta <wi...@gmail.com> wrote:

> Here are the error logs.
>
> First error log was encountered when getting the values from the MapState.
>
> java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879.1452224137
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	... 22 common frames omitted
> Wrapped by: java.lang.NoClassDefFoundError: com/test/MatcherFactory$$Lambda$879/1452224137
> 	at sun.reflect.GeneratedSerializationConstructorAccessor282.newInstance(Unknown Source)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 	at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45)
> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
> 	... 17 frames truncated
> Subsequent error logs were encountered on task manager restart (for the same job).
> java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879/1452224137
> 	at java.lang.Class.forName0(Class.java)
> 	at java.lang.Class.forName(Class.java:348)
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> 	... 18 common frames omitted
> Wrapped by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.test.MatcherFactory$$Lambda$879/1452224137
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
> 	... 8 frames truncated
> 	... 6 common frames omitted
> Wrapped by: java.lang.IllegalStateException: Could not initialize keyed state backend.
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:292)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:224)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	... 2 frames truncated
>
>
>
>
> Jayant Ameta
>
>
> On Wed, Nov 21, 2018 at 3:17 PM Dominik Wosiński <wo...@gmail.com> wrote:
>
>> Hey Jayant,
>>
>> I don't really think that the sole fact of using Predicate should cause
>> the *ClassNotFoundException* that You are talking about. The exception
>> may come from the fact that some libraries are missing from Your cluster
>> environment. Have You tried running the job locally to verify that the
>> exception occurs? Also, could You please paste some logs here, they may
>> help in determining the exact reason for the problem.
>>
>> Best Regards,
>> Dom.
>>
>>
>>
>> śr., 21 lis 2018 o 04:41 Jayant Ameta <wi...@gmail.com> napisał(a):
>>
>>> Hi,
>>> I want to store a custom POJO in the MapState. One of the fields in the
>>> object is a java.util.function.Predicate type.
>>> Flink gives ClassNotFoundException exception on the lambda. How do I
>>> store this object in the mapState?
>>>
>>> Marking the predicate field as transient is an option. But in my
>>> use-case, the predicate field is set using another library, and I don't
>>> want to call it every time I want.
>>>
>>>
>>> Jayant Ameta
>>>
>>

Re: Store Predicate or any lambda in MapState

Posted by Jayant Ameta <wi...@gmail.com>.
Here are the error logs.

First error log was encountered when getting the values from the MapState.

java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879.1452224137
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 22 common frames omitted
Wrapped by: java.lang.NoClassDefFoundError:
com/test/MatcherFactory$$Lambda$879/1452224137
	at sun.reflect.GeneratedSerializationConstructorAccessor282.newInstance(Unknown
Source)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45)
	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
	... 17 frames truncated
Subsequent error logs were encountered on task manager restart (for
the same job).
java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879/1452224137
	at java.lang.Class.forName0(Class.java)
	at java.lang.Class.forName(Class.java:348)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
	... 18 common frames omitted
Wrapped by: com.esotericsoftware.kryo.KryoException: Unable to find
class: com.test.MatcherFactory$$Lambda$879/1452224137
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
	... 8 frames truncated
	... 6 common frames omitted
Wrapped by: java.lang.IllegalStateException: Could not initialize
keyed state backend.
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:292)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:224)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
	... 2 frames truncated




Jayant Ameta


On Wed, Nov 21, 2018 at 3:17 PM Dominik Wosiński <wo...@gmail.com> wrote:

> Hey Jayant,
>
> I don't really think that the sole fact of using Predicate should cause
> the *ClassNotFoundException* that You are talking about. The exception
> may come from the fact that some libraries are missing from Your cluster
> environment. Have You tried running the job locally to verify that the
> exception occurs? Also, could You please paste some logs here, they may
> help in determining the exact reason for the problem.
>
> Best Regards,
> Dom.
>
>
>
> śr., 21 lis 2018 o 04:41 Jayant Ameta <wi...@gmail.com> napisał(a):
>
>> Hi,
>> I want to store a custom POJO in the MapState. One of the fields in the
>> object is a java.util.function.Predicate type.
>> Flink gives ClassNotFoundException exception on the lambda. How do I
>> store this object in the mapState?
>>
>> Marking the predicate field as transient is an option. But in my
>> use-case, the predicate field is set using another library, and I don't
>> want to call it every time I want.
>>
>>
>> Jayant Ameta
>>
>

Re: Store Predicate or any lambda in MapState

Posted by Dominik Wosiński <wo...@gmail.com>.
Hey Jayant,

I don't really think that the sole fact of using Predicate should cause the
*ClassNotFoundException* that You are talking about. The exception may come
from the fact that some libraries are missing from Your cluster
environment. Have You tried running the job locally to verify that the
exception occurs? Also, could You please paste some logs here, they may
help in determining the exact reason for the problem.

Best Regards,
Dom.



śr., 21 lis 2018 o 04:41 Jayant Ameta <wi...@gmail.com> napisał(a):

> Hi,
> I want to store a custom POJO in the MapState. One of the fields in the
> object is a java.util.function.Predicate type.
> Flink gives ClassNotFoundException exception on the lambda. How do I
> store this object in the mapState?
>
> Marking the predicate field as transient is an option. But in my use-case,
> the predicate field is set using another library, and I don't want to call
> it every time I want.
>
>
> Jayant Ameta
>