You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Hill <qu...@gmail.com> on 2021/12/19 23:18:17 UTC

Kryo EOFException: No more bytes left

Hi.

I was curious if anyone else has hit this exception.  I'm using the
IntervalJoinOperator to two streams of protos.  I registered the protos
with a kryo serializer.  I started hitting this issue which looks like the
operator is trying to deserialize a bad set of bytes that it serialized.
I'm not doing anything weird or custom with the code.  It's a pretty simple
interval join.

Has anyone hit this before?  How have people solved this?  I skimmed the
operator code and don't see an easy way to exclude the bad serialized
bytes.  I could fork the interval join code and have a route that writes
badly serialized

A couple ideas:
1. I could fork the interval join code and have a route to handle bad
serialization.
2. Maybe there's a weird case where the bytes become empty and this is an
exception given for an empty array of bytes?

Could this be a version issue?  My Flink version is v1.12.3 and
Twitter/chill v0.9.4.

Thoughts?


java.lang.RuntimeException: Could not create class
com.myexample.proto.MyJoinedOutput

        at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]

        at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]

        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:452)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:401)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:393)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:127)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.addToBuffer(IntervalJoinOperator.java:282)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement(IntervalJoinOperator.java:234)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:194)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]

Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException:
No more bytes left.

        at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]

        ... 27 more

Caused by: java.io.EOFException: No more bytes left.

        at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]

        ... 27 more

Re: Kryo EOFException: No more bytes left

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Dan,
in my experience this kind of errors are caused by some other problem
that's not immediately obvious (like some serialization, memory or RocksDB
problem).
Could it be that an Avro field cannot be null or viceversa?

On Tue, Dec 21, 2021 at 7:21 PM Dan Hill <qu...@gmail.com> wrote:

> I was not able to reproduce it by re-running the same job with an updated
> kryo library.  The join doesn't do anything special.
>
> On Sun, Dec 19, 2021 at 4:58 PM Dan Hill <qu...@gmail.com> wrote:
>
>> I'll retry the job to see if it's reproducible. The serialized state is
>> bad so that run keeps failing.
>>
>> On Sun, Dec 19, 2021 at 4:28 PM Zhipeng Zhang <zh...@gmail.com>
>> wrote:
>>
>>> Hi Dan,
>>>
>>> Could you provide the code snippet such that we can reproduce the bug
>>> here?
>>>
>>> Dan Hill <qu...@gmail.com> 于2021年12月20日周一 07:18写道:
>>>
>>>> Hi.
>>>>
>>>> I was curious if anyone else has hit this exception.  I'm using the
>>>> IntervalJoinOperator to two streams of protos.  I registered the protos
>>>> with a kryo serializer.  I started hitting this issue which looks like the
>>>> operator is trying to deserialize a bad set of bytes that it serialized.
>>>> I'm not doing anything weird or custom with the code.  It's a pretty simple
>>>> interval join.
>>>>
>>>> Has anyone hit this before?  How have people solved this?  I skimmed
>>>> the operator code and don't see an easy way to exclude the bad serialized
>>>> bytes.  I could fork the interval join code and have a route that writes
>>>> badly serialized
>>>>
>>>> A couple ideas:
>>>> 1. I could fork the interval join code and have a route to handle bad
>>>> serialization.
>>>> 2. Maybe there's a weird case where the bytes become empty and this is
>>>> an exception given for an empty array of bytes?
>>>>
>>>> Could this be a version issue?  My Flink version is v1.12.3 and
>>>> Twitter/chill v0.9.4.
>>>>
>>>> Thoughts?
>>>>
>>>>
>>>> java.lang.RuntimeException: Could not create class
>>>> com.myexample.proto.MyJoinedOutput
>>>>
>>>>         at
>>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
>>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>>
>>>>         at
>>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
>>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>>
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:452)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:401)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:393)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:127)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.addToBuffer(IntervalJoinOperator.java:282)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement(IntervalJoinOperator.java:234)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:194)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
>>>>
>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>> java.io.EOFException: No more bytes left.
>>>>
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>>
>>>>         ... 27 more
>>>>
>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>>
>>>>         ... 27 more
>>>>
>>>
>>>
>>> --
>>> best,
>>> Zhipeng
>>>
>>>

Re: Kryo EOFException: No more bytes left

Posted by Dan Hill <qu...@gmail.com>.
I was not able to reproduce it by re-running the same job with an updated
kryo library.  The join doesn't do anything special.

On Sun, Dec 19, 2021 at 4:58 PM Dan Hill <qu...@gmail.com> wrote:

> I'll retry the job to see if it's reproducible. The serialized state is
> bad so that run keeps failing.
>
> On Sun, Dec 19, 2021 at 4:28 PM Zhipeng Zhang <zh...@gmail.com>
> wrote:
>
>> Hi Dan,
>>
>> Could you provide the code snippet such that we can reproduce the bug
>> here?
>>
>> Dan Hill <qu...@gmail.com> 于2021年12月20日周一 07:18写道:
>>
>>> Hi.
>>>
>>> I was curious if anyone else has hit this exception.  I'm using the
>>> IntervalJoinOperator to two streams of protos.  I registered the protos
>>> with a kryo serializer.  I started hitting this issue which looks like the
>>> operator is trying to deserialize a bad set of bytes that it serialized.
>>> I'm not doing anything weird or custom with the code.  It's a pretty simple
>>> interval join.
>>>
>>> Has anyone hit this before?  How have people solved this?  I skimmed the
>>> operator code and don't see an easy way to exclude the bad serialized
>>> bytes.  I could fork the interval join code and have a route that writes
>>> badly serialized
>>>
>>> A couple ideas:
>>> 1. I could fork the interval join code and have a route to handle bad
>>> serialization.
>>> 2. Maybe there's a weird case where the bytes become empty and this is
>>> an exception given for an empty array of bytes?
>>>
>>> Could this be a version issue?  My Flink version is v1.12.3 and
>>> Twitter/chill v0.9.4.
>>>
>>> Thoughts?
>>>
>>>
>>> java.lang.RuntimeException: Could not create class
>>> com.myexample.proto.MyJoinedOutput
>>>
>>>         at
>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>
>>>         at
>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:452)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:401)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:393)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:127)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.addToBuffer(IntervalJoinOperator.java:282)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement(IntervalJoinOperator.java:234)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:194)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
>>>
>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>> java.io.EOFException: No more bytes left.
>>>
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>
>>>         ... 27 more
>>>
>>> Caused by: java.io.EOFException: No more bytes left.
>>>
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>>
>>>         ... 27 more
>>>
>>
>>
>> --
>> best,
>> Zhipeng
>>
>>

Re: Kryo EOFException: No more bytes left

Posted by Dan Hill <qu...@gmail.com>.
I'll retry the job to see if it's reproducible. The serialized state is bad
so that run keeps failing.

On Sun, Dec 19, 2021 at 4:28 PM Zhipeng Zhang <zh...@gmail.com>
wrote:

> Hi Dan,
>
> Could you provide the code snippet such that we can reproduce the bug here?
>
> Dan Hill <qu...@gmail.com> 于2021年12月20日周一 07:18写道:
>
>> Hi.
>>
>> I was curious if anyone else has hit this exception.  I'm using the
>> IntervalJoinOperator to two streams of protos.  I registered the protos
>> with a kryo serializer.  I started hitting this issue which looks like the
>> operator is trying to deserialize a bad set of bytes that it serialized.
>> I'm not doing anything weird or custom with the code.  It's a pretty simple
>> interval join.
>>
>> Has anyone hit this before?  How have people solved this?  I skimmed the
>> operator code and don't see an easy way to exclude the bad serialized
>> bytes.  I could fork the interval join code and have a route that writes
>> badly serialized
>>
>> A couple ideas:
>> 1. I could fork the interval join code and have a route to handle bad
>> serialization.
>> 2. Maybe there's a weird case where the bytes become empty and this is an
>> exception given for an empty array of bytes?
>>
>> Could this be a version issue?  My Flink version is v1.12.3 and
>> Twitter/chill v0.9.4.
>>
>> Thoughts?
>>
>>
>> java.lang.RuntimeException: Could not create class
>> com.myexample.proto.MyJoinedOutput
>>
>>         at
>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>
>>         at
>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>
>>         at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:452)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:401)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:393)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:127)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.addToBuffer(IntervalJoinOperator.java:282)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement(IntervalJoinOperator.java:234)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:194)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException:
>> No more bytes left.
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>
>>         ... 27 more
>>
>> Caused by: java.io.EOFException: No more bytes left.
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>>
>>         ... 27 more
>>
>
>
> --
> best,
> Zhipeng
>
>

Re: Kryo EOFException: No more bytes left

Posted by Zhipeng Zhang <zh...@gmail.com>.
Hi Dan,

Could you provide the code snippet such that we can reproduce the bug here?

Dan Hill <qu...@gmail.com> 于2021年12月20日周一 07:18写道:

> Hi.
>
> I was curious if anyone else has hit this exception.  I'm using the
> IntervalJoinOperator to two streams of protos.  I registered the protos
> with a kryo serializer.  I started hitting this issue which looks like the
> operator is trying to deserialize a bad set of bytes that it serialized.
> I'm not doing anything weird or custom with the code.  It's a pretty simple
> interval join.
>
> Has anyone hit this before?  How have people solved this?  I skimmed the
> operator code and don't see an easy way to exclude the bad serialized
> bytes.  I could fork the interval join code and have a route that writes
> badly serialized
>
> A couple ideas:
> 1. I could fork the interval join code and have a route to handle bad
> serialization.
> 2. Maybe there's a weird case where the bytes become empty and this is an
> exception given for an empty array of bytes?
>
> Could this be a version issue?  My Flink version is v1.12.3 and
> Twitter/chill v0.9.4.
>
> Thoughts?
>
>
> java.lang.RuntimeException: Could not create class
> com.myexample.proto.MyJoinedOutput
>
>         at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>
>         at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>
>         at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:452)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator$BufferEntrySerializer.deserialize(IntervalJoinOperator.java:401)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:393)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:127)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.addToBuffer(IntervalJoinOperator.java:282)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement(IntervalJoinOperator.java:234)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:194)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException:
> No more bytes left.
>
>         at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>
>         ... 27 more
>
> Caused by: java.io.EOFException: No more bytes left.
>
>         at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
> ~[blob_p-b0913791f2636dfd2781bd693704accd5ae305e8-26c95e4b9b0bf7dc8409a4d2bd4c57bf:?]
>
>         ... 27 more
>


-- 
best,
Zhipeng