You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Reinier Kip <rk...@bol.com> on 2017/08/30 12:00:10 UTC

EOFException related to memory segments during run of Beam pipeline on Flink

Hi all,

I’ve been running a Beam pipeline on Flink. Depending on the dataset size and the heap memory configuration of the jobmanager and taskmanager, I may run into an EOFException, which causes the job to fail. You will find the stacktrace near the bottom of this post (data censored).

I would not expect such a sudden failure as the dataset apparently grows above a certain size. Doesn’t Flink spill data over to disk when memory runs out? How do I deal with this unpredictable behaviour in a production situation? I’m running a clean Flink 1.3.2 with heap memory of 768MiB. The dataset size is in the tens of megabytes. The same root EOFException occurred in Flink 1.2.1. I will gladly provide more information where needed.

If this is expected behaviour, I feel it should be documented, meaning a more informative exception message, and managing user expectations in the guides. (I have not been able to find any information regarding this exception.)

Hoping that someone can enlighten me,

Reinier


08/30/2017 13:48:33     GroupReduce (GroupReduce at GroupByKey)(1/1) switched to FAILED
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:466)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.beam.sdk.coders.CoderException: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:129)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:48)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:76)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:60)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:33)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:641)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:80)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:281)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1037)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.EOFException
        at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:79)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:190)
        at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:49)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:126)
        ... 19 more


Re: EOFException related to memory segments during run of Beam pipeline on Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Reinier,

this is in fact a bug that you stumbled upon.
In general, Flink works very well with larger data sets and little memory
and gracefully spills data to disk.
The problem in your case is caused by a wrapped exception.

Internally, Flink uses an EOFException to signal that the memory pool is
exhausted (in your case thrown by SimpleCollectingOutputView.nextSegment()).
Unfortunately, Beam's SerializableCoder wraps this exception in a
CoderException which unknown to Flink and won't be detected as EOFException
by NormalizedKeySorter.write() (line 283).

I think it is debatable whether this is an issue of Beam's Flink runner or
Flink itself.
In any case, it would be good if you could open an issue at Beam's JIRA to
track this problem.
A quick solution for your use case should could be to use a custom coder
that forwards the EOFException instead of wrapping it.

Best, Fabian

2017-08-30 14:00 GMT+02:00 Reinier Kip <rk...@bol.com>:

> Hi all,
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size
> and the heap memory configuration of the jobmanager and taskmanager, I may
> run into an EOFException, which causes the job to fail. You will find the
> stacktrace near the bottom of this post (data censored).
>
> I would not expect such a sudden failure as the dataset apparently grows
> above a certain size. Doesn’t Flink spill data over to disk when memory
> runs out? How do I deal with this unpredictable behaviour in a production
> situation? I’m running a clean Flink 1.3.2 with heap memory of 768MiB. The
> dataset size is in the tens of megabytes. The same root EOFException
> occurred in Flink 1.2.1. I will gladly provide more information where
> needed.
>
> If this is expected behaviour, I feel it should be documented, meaning a
> more informative exception message, and managing user expectations in the
> guides. (I have not been able to find any information regarding this
> exception.)
>
> Hoping that someone can enlighten me,
>
> Reinier
>
>
> 08/30/2017 13:48:33     GroupReduce (GroupReduce at GroupByKey)(1/1)
> switched to FAILED
> java.lang.Exception: The data preparation for task 'GroupReduce
> (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted
> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
> unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:466)
>         at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:355)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: unable
> to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
>         at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1095)
>         at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:460)
>         ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: unable to serialize record
> FakeSerialisableObjectWithStringsAndDoubles{}
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: org.apache.beam.sdk.coders.CoderException: unable to serialize
> record FakeSerialisableObjectWithStringsAndDoubles{}
>         at org.apache.beam.sdk.coders.SerializableCoder.encode(
> SerializableCoder.java:129)
>         at org.apache.beam.sdk.coders.SerializableCoder.encode(
> SerializableCoder.java:48)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
>         at org.apache.beam.sdk.transforms.join.UnionCoder.
> encode(UnionCoder.java:76)
>         at org.apache.beam.sdk.transforms.join.UnionCoder.
> encode(UnionCoder.java:60)
>         at org.apache.beam.sdk.transforms.join.UnionCoder.
> encode(UnionCoder.java:33)
>         at org.apache.beam.sdk.coders.IterableLikeCoder.encode(
> IterableLikeCoder.java:99)
>         at org.apache.beam.sdk.coders.IterableLikeCoder.encode(
> IterableLikeCoder.java:60)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>         at org.apache.beam.sdk.util.WindowedValue$
> FullWindowedValueCoder.encode(WindowedValue.java:652)
>         at org.apache.beam.sdk.util.WindowedValue$
> FullWindowedValueCoder.encode(WindowedValue.java:641)
>         at org.apache.beam.sdk.util.WindowedValue$
> FullWindowedValueCoder.encode(WindowedValue.java:599)
>         at org.apache.beam.runners.flink.translation.types.
> CoderTypeSerializer.serialize(CoderTypeSerializer.java:80)
>         at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.serialize(TupleSerializer.java:125)
>         at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.serialize(TupleSerializer.java:30)
>         at org.apache.flink.runtime.operators.sort.
> NormalizedKeySorter.write(NormalizedKeySorter.java:281)
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:1037)
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.io.EOFException
>         at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.
> nextSegment(SimpleCollectingOutputView.java:79)
>         at org.apache.flink.runtime.memory.AbstractPagedOutputView.
> advance(AbstractPagedOutputView.java:140)
>         at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(
> AbstractPagedOutputView.java:190)
>         at org.apache.beam.runners.flink.translation.wrappers.
> DataOutputViewWrapper.write(DataOutputViewWrapper.java:49)
>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>         at java.io.ObjectOutputStream$BlockDataOutputStream.drain(
> ObjectOutputStream.java:1877)
>         at java.io.ObjectOutputStream$BlockDataOutputStream.
> setBlockDataMode(ObjectOutputStream.java:1786)
>         at java.io.ObjectOutputStream.writeNonProxyDesc(
> ObjectOutputStream.java:1286)
>         at java.io.ObjectOutputStream.writeClassDesc(
> ObjectOutputStream.java:1231)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1427)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeFatalException(
> ObjectOutputStream.java:1577)
>         at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:351)
>         at org.apache.beam.sdk.coders.SerializableCoder.encode(
> SerializableCoder.java:126)
>         ... 19 more
>
>