You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rinat <r....@cleverdata.ru> on 2019/02/12 18:56:27 UTC

In-Memory state serialization with kryo fails

Hi mates !

I’ve implemented a job, that stores it’s progress using MapState[K, V], where K - is java.lang.String, and V - is a collection of some typed objects java.util.List[SomeClass[_]]
When Flink is trying to serialize this state, it is using kryo serializer for value object and fails with StackOverflowException

java.lang.StackOverflowError
	at java.util.HashMap.hash(HashMap.java:338)
	at java.util.HashMap.get(HashMap.java:556)
	at com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:43)
 
This problem is related with the known bug in kryo (https://github.com/EsotericSoftware/kryo/issues/341), and reveals itself only when type of SomeClass is a java.util.BitSet. 

I’ve checked my job locally (from IDE) with latest (4.0.2 <https://mvnrepository.com/artifact/com.esotericsoftware/kryo/4.0.2>) kryo lib, and it works fine, but I couldn’t change kryo version for distributed mode, because it’s packaged into fat-jar (flink-dist_2.11-1.6.1.jar), that
contains all runtime dependencies for Flink.

Maybe you can give me any advices, how to solve this issue, or register a separate serializers for this case ?

Thx for your help.


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.sharipov@cleverdata.ru <ma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever


Re: In-Memory state serialization with kryo fails

Posted by Rinat <r....@cleverdata.ru>.
Hi Gordon, thx for you time, will try to find other suitable serializer.

> On 13 Feb 2019, at 07:25, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
> 
> Hi,
> 
> I would suggest to avoid Kryo for state serialization, especially if this job is meant for production usage.
> It might get in the way in the future when you might decide to upgrade your value state schema.
> 
> To do that, when declaring the descriptor for your MapState, provide a specific serializer for your value ( java.util.List[SomeClass[_]]  ).
> You should be able to use Flink's ListSerializer for this. By providing a specific serializer, this bypasses Flink's type extraction for your state which determines to use the KryoSerializer as a fallback for unrecognizable types.
> You can find more information about custom state serialization here [1].
> 
> Cheers,
> Gordon
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html>
> On Wed, Feb 13, 2019 at 2:56 AM Rinat <r.sharipov@cleverdata.ru <ma...@cleverdata.ru>> wrote:
> Hi mates !
> 
> I’ve implemented a job, that stores it’s progress using MapState[K, V], where K - is java.lang.String, and V - is a collection of some typed objects java.util.List[SomeClass[_]]
> When Flink is trying to serialize this state, it is using kryo serializer for value object and fails with StackOverflowException
> 
> java.lang.StackOverflowError
> 	at java.util.HashMap.hash(HashMap.java:338)
> 	at java.util.HashMap.get(HashMap.java:556)
> 	at com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:43)
>  
> This problem is related with the known bug in kryo (https://github.com/EsotericSoftware/kryo/issues/341 <https://github.com/EsotericSoftware/kryo/issues/341>), and reveals itself only when type of SomeClass is a java.util.BitSet. 
> 
> I’ve checked my job locally (from IDE) with latest (4.0.2 <https://mvnrepository.com/artifact/com.esotericsoftware/kryo/4.0.2>) kryo lib, and it works fine, but I couldn’t change kryo version for distributed mode, because it’s packaged into fat-jar (flink-dist_2.11-1.6.1.jar), that
> contains all runtime dependencies for Flink.
> 
> Maybe you can give me any advices, how to solve this issue, or register a separate serializers for this case ?
> 
> Thx for your help.
> 
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.sharipov@cleverdata.ru <ma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.sharipov@cleverdata.ru <ma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever


Re: In-Memory state serialization with kryo fails

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

I would suggest to avoid Kryo for state serialization, especially if this
job is meant for production usage.
It might get in the way in the future when you might decide to upgrade your
value state schema.

To do that, when declaring the descriptor for your MapState, provide a
specific serializer for your value ( *java.util.List[SomeClass[_]]*  ).
You should be able to use Flink's ListSerializer for this. By providing a
specific serializer, this bypasses Flink's type extraction for your state
which determines to use the KryoSerializer as a fallback for unrecognizable
types.
You can find more information about custom state serialization here [1].

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html

On Wed, Feb 13, 2019 at 2:56 AM Rinat <r....@cleverdata.ru> wrote:

> Hi mates !
>
> I’ve implemented a job, that stores it’s progress using *MapState[K, V]*,
> where *K* - is *java.lang.String*, and *V* - is a collection of some
> typed objects *java.util.List[SomeClass[_]]*
> When Flink is trying to serialize this state, it is using kryo serializer
> for value object and fails with *StackOverflowException*
>
> *java.lang.StackOverflowError*
> * at java.util.HashMap.hash(HashMap.java:338)*
> * at java.util.HashMap.get(HashMap.java:556)*
> * at com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:43)*
>
> This problem is related with the known bug in *kryo* (
> https://github.com/EsotericSoftware/kryo/issues/341), and reveals itself
> only when type of* SomeClass* is a *java.util.BitSet.*
>
> I’ve checked my job locally (from IDE) with latest (4.0.2
> <https://mvnrepository.com/artifact/com.esotericsoftware/kryo/4.0.2>)
> kryo lib, and it works fine, but I couldn’t change kryo version for
> distributed mode, because it’s packaged into fat-jar
> (flink-dist_2.11-1.6.1.jar), that
> contains all runtime dependencies for Flink.
>
> Maybe you can give me any advices, how to solve this issue, or register a
> separate serializers for this case ?
>
> Thx for your help.
>
>
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>
> email: r.sharipov@cleverdata.ru <a....@cleverdata.ru>
> mobile: +7 (925) 416-37-26
>
> CleverDATA
> make your data clever
>
>