You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuval Itzchakov <yu...@gmail.com> on 2021/09/09 14:35:14 UTC

Job crashing with RowSerializer EOF exception

Hi,

Flink 1.13.2
Scala 2.12.7

Running an app in production, I'm running into the following exception that
frequently fails the job:

switched from RUNNING to FAILED with failure cause: java.io.IOException:
Can't get next record for channel InputChannelInfo{gateIdx=0,
inputChannelIdx=2}\n\tat
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:98)\n\tat
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)\n\tat
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)\n\tat
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)\n\tat
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)\n\tat
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)\n\tat
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)\n\tat
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)\n\tat
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)\n\tat
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)\n\tat
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
java.io.EOFException\n\tat
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)\n\tat
org.apache.flink.types.StringValue.readString(StringValue.java:781)\n\tat
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)\n\tat
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)\n\tat
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:345)\n\tat
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:72)\n\tat
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)\n\tat
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)\n\tat
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)\n\tat
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)\n\tat
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)\n\tat
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)\n\t...
11

Deserialization logic for the rows seems to be failing with an EOF
exception. Any help on the best way to debug this or try to get more info
would be great.

Thanks.
-- 
Best Regards,
Yuval Itzchakov.

Re: Job crashing with RowSerializer EOF exception

Posted by Timo Walther <tw...@apache.org>.
I assume you are still using toAppendStream or toRetractStream? 
Otherwise I'm wondering where the RowSerializer is actually coming from. 
The new planner doesn't use a row serializer.

Debugging serializer issue is difficult. We need more information about 
the pipeline.

Regards,
Timo


On 10.09.21 08:01, Yuval Itzchakov wrote:
> Hi Robert,
> 
> There's no custom Kryo serializer. It's a RowSerializer that is 
> generating the output of a Table -> DataStream conversion.
> 
> On Thu, Sep 9, 2021, 21:42 Robert Metzger <rmetzger@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Yuval,
> 
>     EOF exceptions during serialization are usually an indication that
>     some serializers in the serializer chain is somehow broken.
>     What data type are you serializating? Does it include some type
>     serializer by a custom serializer, or Kryo, ... ?
> 
>     On Thu, Sep 9, 2021 at 4:35 PM Yuval Itzchakov <yuvalos@gmail.com
>     <ma...@gmail.com>> wrote:
> 
>         Hi,
> 
>         Flink 1.13.2
>         Scala 2.12.7
> 
>         Running an app in production, I'm running into the following
>         exception that frequently fails the job:
> 
>         switched from RUNNING to FAILED with failure cause:
>         java.io.IOException: Can't get next record for channel
>         InputChannelInfo{gateIdx=0, inputChannelIdx=2}\n\tat
>         org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:98)\n\tat
>         org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)\n\tat
>         org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)\n\tat
>         org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)\n\tat
>         org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)\n\tat
>         org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)\n\tat
>         org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)\n\tat
>         org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)\n\tat
>         org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)\n\tat
>         org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)\n\tat
>         org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)\n\tat
>         java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
>         java.io.EOFException\n\tat
>         org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)\n\tat
>         org.apache.flink.types.StringValue.readString(StringValue.java:781)\n\tat
>         org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)\n\tat
>         org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)\n\tat
>         org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:345)\n\tat
>         org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:72)\n\tat
>         org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)\n\tat
>         org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)\n\tat
>         org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)\n\tat
>         org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)\n\tat
>         org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)\n\tat
>         org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)\n\t...
>         11
> 
>         Deserialization logic for the rows seems to be failing with an
>         EOF exception. Any help on the best way to debug this or try to
>         get more info would be great.
> 
>         Thanks.
>         -- 
>         Best Regards,
>         Yuval Itzchakov.
> 


Re: Job crashing with RowSerializer EOF exception

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Robert,

There's no custom Kryo serializer. It's a RowSerializer that is generating
the output of a Table -> DataStream conversion.

On Thu, Sep 9, 2021, 21:42 Robert Metzger <rm...@apache.org> wrote:

> Hi Yuval,
>
> EOF exceptions during serialization are usually an indication that some
> serializers in the serializer chain is somehow broken.
> What data type are you serializating? Does it include some type serializer
> by a custom serializer, or Kryo, ... ?
>
> On Thu, Sep 9, 2021 at 4:35 PM Yuval Itzchakov <yu...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink 1.13.2
>> Scala 2.12.7
>>
>> Running an app in production, I'm running into the following exception
>> that frequently fails the job:
>>
>> switched from RUNNING to FAILED with failure cause: java.io.IOException:
>> Can't get next record for channel InputChannelInfo{gateIdx=0,
>> inputChannelIdx=2}\n\tat
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:98)\n\tat
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)\n\tat
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)\n\tat
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)\n\tat
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)\n\tat
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)\n\tat
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)\n\tat
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)\n\tat
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)\n\tat
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)\n\tat
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)\n\tat
>> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
>> java.io.EOFException\n\tat
>> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)\n\tat
>> org.apache.flink.types.StringValue.readString(StringValue.java:781)\n\tat
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)\n\tat
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)\n\tat
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:345)\n\tat
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:72)\n\tat
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)\n\tat
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)\n\tat
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)\n\tat
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)\n\tat
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)\n\tat
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)\n\t...
>> 11
>>
>> Deserialization logic for the rows seems to be failing with an EOF
>> exception. Any help on the best way to debug this or try to get more info
>> would be great.
>>
>> Thanks.
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

Re: Job crashing with RowSerializer EOF exception

Posted by Robert Metzger <rm...@apache.org>.
Hi Yuval,

EOF exceptions during serialization are usually an indication that some
serializers in the serializer chain is somehow broken.
What data type are you serializating? Does it include some type serializer
by a custom serializer, or Kryo, ... ?

On Thu, Sep 9, 2021 at 4:35 PM Yuval Itzchakov <yu...@gmail.com> wrote:

> Hi,
>
> Flink 1.13.2
> Scala 2.12.7
>
> Running an app in production, I'm running into the following exception
> that frequently fails the job:
>
> switched from RUNNING to FAILED with failure cause: java.io.IOException:
> Can't get next record for channel InputChannelInfo{gateIdx=0,
> inputChannelIdx=2}\n\tat
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:98)\n\tat
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)\n\tat
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)\n\tat
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)\n\tat
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)\n\tat
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)\n\tat
> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> java.io.EOFException\n\tat
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)\n\tat
> org.apache.flink.types.StringValue.readString(StringValue.java:781)\n\tat
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)\n\tat
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)\n\tat
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:345)\n\tat
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:72)\n\tat
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)\n\tat
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)\n\tat
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)\n\tat
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)\n\tat
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)\n\tat
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)\n\t...
> 11
>
> Deserialization logic for the rows seems to be failing with an EOF
> exception. Any help on the best way to debug this or try to get more info
> would be great.
>
> Thanks.
> --
> Best Regards,
> Yuval Itzchakov.
>