You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2019/07/04 09:25:51 UTC

Re: Debug Kryo.Serialization Exception

Any news on this? Have you found the cause of the error?

On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier <po...@okkam.it>
wrote:

> Indeed looking at StreamElementSerializer the duplicate() method could be
> bugged:
>
> @Override
> public StreamElementSerializer<T> duplicate() {
>       TypeSerializer<T> copy = typeSerializer.duplicate();
>       return (copy == typeSerializer) ? this : new
> StreamElementSerializer<T>(copy);
> }
>
> Is ti safe to return this when copy == typeSerializer ...?
>
> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Hi Fabian,
>> we had similar errors with Flink 1.3 [1][2] and the error was caused by
>> the fact that a serialised was sharing the same object with multiple
>> threads.
>> The error was not deterministic and was changing from time to time.
>> So maybe it could be something similar (IMHO).
>>
>> [1] http://codeha.us/apache-flink-users/msg02010.html
>> [2]
>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3cCAELUF_AiC_izyW5F27KnTER_y6h4+nzG2CpniozQdgm+Wk7xVw@mail.gmail.com%3e
>>
>> Best,
>> Flavio
>>
>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert <fa...@zalando.de> wrote:
>>
>>> additionally we have these coming with this as well all the time:
>>>
>>> com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
>>> Serialization trace:
>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>> 	at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>
>>>
>>> or
>>>
>>>
>>> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>> Serialization trace:
>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>> 	at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>> 	at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>> 	at java.util.ArrayList.get(ArrayList.java:433)
>>> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>> 	at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131)
>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>> 	... 12 more
>>>
>>>
>>> --
>>>
>>>
>>> *Fabian WollertZalando SE*
>>>
>>> E-Mail: fabian@zalando.de
>>> Phone: +49 152 03479412
>>>
>>>
>>>
>>> Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert <
>>> fabian@zalando.de>:
>>>
>>>> Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a
>>>> Custom Source and Ingesting into an Elasticsearch Cluster (V.5.6). In
>>>> recent times, we see more and more Exceptions happening like this:
>>>>
>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^
>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ClassNotFoundException: com. ^
>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> 	at java.lang.Class.forName0(Native Method)
>>>> 	at java.lang.Class.forName(Class.java:348)
>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>
>>>> ... 13 more
>>>>
>>>> or
>>>>
>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com.fasterxml.jackson.databind.node.DoubleNod    com.fasterxml.jackson.databind.node.ObjectNode
>>>> Serialization trace:
>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.node.DoubleNod    com.fasterxml.jackson.databind.node.ObjectNode
>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> 	at java.lang.Class.forName0(Native Method)
>>>> 	at java.lang.Class.forName(Class.java:348)
>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>
>>>> ... 19 more
>>>>
>>>> I guess somewhere the serialization between Steps in the TaskManager
>>>> fails. Unfortunately, it happens very unpredictably. My question is: has
>>>> someone seen this before? If yes, what was your approach on debugging it?
>>>> We have this problem mostly right now on high volume event processing, so
>>>> only when a high load is processed, then this appears. i tried to
>>>> investigate with TRACE log level already, but that keeps the instance this
>>>> is running on more busy with writing tons of logs, which slows down
>>>> processing and eventually does not trigger the exception. I'm wondering if
>>>> there is another way of investigation here possible.
>>>>
>>>> Thx in advance for any hints how to debug this.
>>>>
>>>> --
>>>>
>>>>
>>>> *Fabian WollertZalando SE*
>>>>
>>>> E-Mail: fabian@zalando.de
>>>>
>>>
>>
>
>

Re: Debug Kryo.Serialization Exception

Posted by Fabian Wollert <fa...@zalando.de>.
*@Fabian do you register any types / serializers via
ExecutionConfig.registerKryoType(...) /
ExecutionConfig.registerTypeWithKryoSerializer(...)?*

Nope, not at all. our flink job code has nowhere the word "Kryo" at all.

thx for looking into it ...

--


*Fabian WollertZalando SE*

E-Mail: fabian@zalando.de

Am Do., 4. Juli 2019 um 11:51 Uhr schrieb Tzu-Li (Gordon) Tai <
tzulitai@apache.org>:

> I quickly checked the implementation of duplicate() for both the
> KryoSerializer and StreamElementSerializer (which are the only serializers
> involved here).
> They seem to be correct; especially for the KryoSerializer, since
> FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when
> duplicating it, and therefore Kryo instances should not be shared at all
> across duplicates.
> This seems to rule out any duplication issues with the serializers.
>
> As a maybe relevant question, @Fabian do you register any types /
> serializers via ExecutionConfig.registerKryoType(...) /
> ExecutionConfig.registerTypeWithKryoSerializer(...)?
>
> Best,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-8836
>
> On Thu, Jul 4, 2019 at 5:29 PM Fabian Wollert <fa...@zalando.de> wrote:
>
>> No, not yet. We lack some knowledge in understanding this. The only thing
>> we found out that it happens most probably in the Elasticsearch Sink,
>> because:
>> - some error messages have the sink in their stack trace.
>> - when bumping the ES nodes specs on AWS, the error happens less often
>> (we haven't bumped it to super large instances yet, nor got to a state
>> where they go away completely. also this would not be the ideal fix)
>>
>> so my current assumption is that some backpressuring is not happening
>> correctly. but this is super vaguely, any other hints or support on this is
>> highly appreciated.
>>
>> --
>>
>>
>> *Fabian WollertZalando SE*
>>
>> E-Mail: fabian@zalando.de
>>
>>
>> Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
>> pompermaier@okkam.it>:
>>
>>> Any news on this? Have you found the cause of the error?
>>>
>>> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> Indeed looking at StreamElementSerializer the duplicate() method could
>>>> be bugged:
>>>>
>>>> @Override
>>>> public StreamElementSerializer<T> duplicate() {
>>>>       TypeSerializer<T> copy = typeSerializer.duplicate();
>>>>       return (copy == typeSerializer) ? this : new
>>>> StreamElementSerializer<T>(copy);
>>>> }
>>>>
>>>> Is ti safe to return this when copy == typeSerializer ...?
>>>>
>>>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Hi Fabian,
>>>>> we had similar errors with Flink 1.3 [1][2] and the error was caused
>>>>> by the fact that a serialised was sharing the same object with multiple
>>>>> threads.
>>>>> The error was not deterministic and was changing from time to time.
>>>>> So maybe it could be something similar (IMHO).
>>>>>
>>>>> [1] http://codeha.us/apache-flink-users/msg02010.html
>>>>> [2]
>>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3cCAELUF_AiC_izyW5F27KnTER_y6h4+nzG2CpniozQdgm+Wk7xVw@mail.gmail.com%3e
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert <fa...@zalando.de>
>>>>> wrote:
>>>>>
>>>>>> additionally we have these coming with this as well all the time:
>>>>>>
>>>>>> com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
>>>>>> Serialization trace:
>>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>>
>>>>>>
>>>>>> or
>>>>>>
>>>>>>
>>>>>> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>>>> Serialization trace:
>>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>>>> 	at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>>>>> 	at java.util.ArrayList.get(ArrayList.java:433)
>>>>>> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>>>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131)
>>>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>>> 	... 12 more
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>>
>>>>>> *Fabian WollertZalando SE*
>>>>>>
>>>>>> E-Mail: fabian@zalando.de
>>>>>> Phone: +49 152 03479412
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert <
>>>>>> fabian@zalando.de>:
>>>>>>
>>>>>>> Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a
>>>>>>> Custom Source and Ingesting into an Elasticsearch Cluster (V.5.6). In
>>>>>>> recent times, we see more and more Exceptions happening like this:
>>>>>>>
>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^
>>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.lang.ClassNotFoundException: com. ^
>>>>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>> 	at java.lang.Class.forName0(Native Method)
>>>>>>> 	at java.lang.Class.forName(Class.java:348)
>>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>>
>>>>>>> ... 13 more
>>>>>>>
>>>>>>> or
>>>>>>>
>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com.fasterxml.jackson.databind.node.DoubleNod    com.fasterxml.jackson.databind.node.ObjectNode
>>>>>>> Serialization trace:
>>>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>>>> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.node.DoubleNod    com.fasterxml.jackson.databind.node.ObjectNode
>>>>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>> 	at java.lang.Class.forName0(Native Method)
>>>>>>> 	at java.lang.Class.forName(Class.java:348)
>>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>>
>>>>>>> ... 19 more
>>>>>>>
>>>>>>> I guess somewhere the serialization between Steps in the TaskManager
>>>>>>> fails. Unfortunately, it happens very unpredictably. My question is: has
>>>>>>> someone seen this before? If yes, what was your approach on debugging it?
>>>>>>> We have this problem mostly right now on high volume event processing, so
>>>>>>> only when a high load is processed, then this appears. i tried to
>>>>>>> investigate with TRACE log level already, but that keeps the instance this
>>>>>>> is running on more busy with writing tons of logs, which slows down
>>>>>>> processing and eventually does not trigger the exception. I'm wondering if
>>>>>>> there is another way of investigation here possible.
>>>>>>>
>>>>>>> Thx in advance for any hints how to debug this.
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>>
>>>>>>> *Fabian WollertZalando SE*
>>>>>>>
>>>>>>> E-Mail: fabian@zalando.de
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>

Re: Debug Kryo.Serialization Exception

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
I quickly checked the implementation of duplicate() for both the
KryoSerializer and StreamElementSerializer (which are the only serializers
involved here).
They seem to be correct; especially for the KryoSerializer, since
FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when
duplicating it, and therefore Kryo instances should not be shared at all
across duplicates.
This seems to rule out any duplication issues with the serializers.

As a maybe relevant question, @Fabian do you register any types /
serializers via ExecutionConfig.registerKryoType(...) /
ExecutionConfig.registerTypeWithKryoSerializer(...)?

Best,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-8836

On Thu, Jul 4, 2019 at 5:29 PM Fabian Wollert <fa...@zalando.de> wrote:

> No, not yet. We lack some knowledge in understanding this. The only thing
> we found out that it happens most probably in the Elasticsearch Sink,
> because:
> - some error messages have the sink in their stack trace.
> - when bumping the ES nodes specs on AWS, the error happens less often (we
> haven't bumped it to super large instances yet, nor got to a state where
> they go away completely. also this would not be the ideal fix)
>
> so my current assumption is that some backpressuring is not happening
> correctly. but this is super vaguely, any other hints or support on this is
> highly appreciated.
>
> --
>
>
> *Fabian WollertZalando SE*
>
> E-Mail: fabian@zalando.de
>
>
> Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
> pompermaier@okkam.it>:
>
>> Any news on this? Have you found the cause of the error?
>>
>> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> Indeed looking at StreamElementSerializer the duplicate() method could
>>> be bugged:
>>>
>>> @Override
>>> public StreamElementSerializer<T> duplicate() {
>>>       TypeSerializer<T> copy = typeSerializer.duplicate();
>>>       return (copy == typeSerializer) ? this : new
>>> StreamElementSerializer<T>(copy);
>>> }
>>>
>>> Is ti safe to return this when copy == typeSerializer ...?
>>>
>>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>>
>>>> Hi Fabian,
>>>> we had similar errors with Flink 1.3 [1][2] and the error was caused by
>>>> the fact that a serialised was sharing the same object with multiple
>>>> threads.
>>>> The error was not deterministic and was changing from time to time.
>>>> So maybe it could be something similar (IMHO).
>>>>
>>>> [1] http://codeha.us/apache-flink-users/msg02010.html
>>>> [2]
>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3cCAELUF_AiC_izyW5F27KnTER_y6h4+nzG2CpniozQdgm+Wk7xVw@mail.gmail.com%3e
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert <fa...@zalando.de>
>>>> wrote:
>>>>
>>>>> additionally we have these coming with this as well all the time:
>>>>>
>>>>> com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
>>>>> Serialization trace:
>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>
>>>>>
>>>>> or
>>>>>
>>>>>
>>>>> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>>> Serialization trace:
>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>>> 	at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>>>> 	at java.util.ArrayList.get(ArrayList.java:433)
>>>>> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131)
>>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>> 	... 12 more
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>> *Fabian WollertZalando SE*
>>>>>
>>>>> E-Mail: fabian@zalando.de
>>>>> Phone: +49 152 03479412
>>>>>
>>>>>
>>>>>
>>>>> Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert <
>>>>> fabian@zalando.de>:
>>>>>
>>>>>> Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a
>>>>>> Custom Source and Ingesting into an Elasticsearch Cluster (V.5.6). In
>>>>>> recent times, we see more and more Exceptions happening like this:
>>>>>>
>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^
>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.ClassNotFoundException: com. ^
>>>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>> 	at java.lang.Class.forName0(Native Method)
>>>>>> 	at java.lang.Class.forName(Class.java:348)
>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>
>>>>>> ... 13 more
>>>>>>
>>>>>> or
>>>>>>
>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com.fasterxml.jackson.databind.node.DoubleNod    com.fasterxml.jackson.databind.node.ObjectNode
>>>>>> Serialization trace:
>>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.node.DoubleNod    com.fasterxml.jackson.databind.node.ObjectNode
>>>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>> 	at java.lang.Class.forName0(Native Method)
>>>>>> 	at java.lang.Class.forName(Class.java:348)
>>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>
>>>>>> ... 19 more
>>>>>>
>>>>>> I guess somewhere the serialization between Steps in the TaskManager
>>>>>> fails. Unfortunately, it happens very unpredictably. My question is: has
>>>>>> someone seen this before? If yes, what was your approach on debugging it?
>>>>>> We have this problem mostly right now on high volume event processing, so
>>>>>> only when a high load is processed, then this appears. i tried to
>>>>>> investigate with TRACE log level already, but that keeps the instance this
>>>>>> is running on more busy with writing tons of logs, which slows down
>>>>>> processing and eventually does not trigger the exception. I'm wondering if
>>>>>> there is another way of investigation here possible.
>>>>>>
>>>>>> Thx in advance for any hints how to debug this.
>>>>>>
>>>>>> --
>>>>>>
>>>>>>
>>>>>> *Fabian WollertZalando SE*
>>>>>>
>>>>>> E-Mail: fabian@zalando.de
>>>>>>
>>>>>
>>>>
>>>
>>>
>>

Re: Debug Kryo.Serialization Exception

Posted by Fabian Wollert <fa...@zalando.de>.
No, not yet. We lack some knowledge in understanding this. The only thing
we found out that it happens most probably in the Elasticsearch Sink,
because:
- some error messages have the sink in their stack trace.
- when bumping the ES nodes specs on AWS, the error happens less often (we
haven't bumped it to super large instances yet, nor got to a state where
they go away completely. also this would not be the ideal fix)

so my current assumption is that some backpressuring is not happening
correctly. but this is super vaguely, any other hints or support on this is
highly appreciated.

--


*Fabian WollertZalando SE*

E-Mail: fabian@zalando.de


Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
pompermaier@okkam.it>:

> Any news on this? Have you found the cause of the error?
>
> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Indeed looking at StreamElementSerializer the duplicate() method could be
>> bugged:
>>
>> @Override
>> public StreamElementSerializer<T> duplicate() {
>>       TypeSerializer<T> copy = typeSerializer.duplicate();
>>       return (copy == typeSerializer) ? this : new
>> StreamElementSerializer<T>(copy);
>> }
>>
>> Is ti safe to return this when copy == typeSerializer ...?
>>
>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> Hi Fabian,
>>> we had similar errors with Flink 1.3 [1][2] and the error was caused by
>>> the fact that a serialised was sharing the same object with multiple
>>> threads.
>>> The error was not deterministic and was changing from time to time.
>>> So maybe it could be something similar (IMHO).
>>>
>>> [1] http://codeha.us/apache-flink-users/msg02010.html
>>> [2]
>>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3cCAELUF_AiC_izyW5F27KnTER_y6h4+nzG2CpniozQdgm+Wk7xVw@mail.gmail.com%3e
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert <fa...@zalando.de>
>>> wrote:
>>>
>>>> additionally we have these coming with this as well all the time:
>>>>
>>>> com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
>>>> Serialization trace:
>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>
>>>>
>>>> or
>>>>
>>>>
>>>> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>> Serialization trace:
>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>> 	at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>>> 	at java.util.ArrayList.get(ArrayList.java:433)
>>>> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>>> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>>> 	at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131)
>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>> 	... 12 more
>>>>
>>>>
>>>> --
>>>>
>>>>
>>>> *Fabian WollertZalando SE*
>>>>
>>>> E-Mail: fabian@zalando.de
>>>> Phone: +49 152 03479412
>>>>
>>>>
>>>>
>>>> Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert <
>>>> fabian@zalando.de>:
>>>>
>>>>> Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a
>>>>> Custom Source and Ingesting into an Elasticsearch Cluster (V.5.6). In
>>>>> recent times, we see more and more Exceptions happening like this:
>>>>>
>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^
>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.ClassNotFoundException: com. ^
>>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>> 	at java.lang.Class.forName0(Native Method)
>>>>> 	at java.lang.Class.forName(Class.java:348)
>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>
>>>>> ... 13 more
>>>>>
>>>>> or
>>>>>
>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com.fasterxml.jackson.databind.node.DoubleNod    com.fasterxml.jackson.databind.node.ObjectNode
>>>>> Serialization trace:
>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>>> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.node.DoubleNod    com.fasterxml.jackson.databind.node.ObjectNode
>>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>> 	at java.lang.Class.forName0(Native Method)
>>>>> 	at java.lang.Class.forName(Class.java:348)
>>>>> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>
>>>>> ... 19 more
>>>>>
>>>>> I guess somewhere the serialization between Steps in the TaskManager
>>>>> fails. Unfortunately, it happens very unpredictably. My question is: has
>>>>> someone seen this before? If yes, what was your approach on debugging it?
>>>>> We have this problem mostly right now on high volume event processing, so
>>>>> only when a high load is processed, then this appears. i tried to
>>>>> investigate with TRACE log level already, but that keeps the instance this
>>>>> is running on more busy with writing tons of logs, which slows down
>>>>> processing and eventually does not trigger the exception. I'm wondering if
>>>>> there is another way of investigation here possible.
>>>>>
>>>>> Thx in advance for any hints how to debug this.
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>> *Fabian WollertZalando SE*
>>>>>
>>>>> E-Mail: fabian@zalando.de
>>>>>
>>>>
>>>
>>
>>
>