You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by zhijiang <wa...@aliyun.com> on 2018/11/22 09:08:06 UTC

回复:OutOfMemoryError while doing join operation in flink

Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each serializer would maintain an internal bytes array for storing intermediate serialization results. The key point is that these overhead internal bytes array are not managed by framework, and their size would exceed with the record size dynamically. If your job has many subpartitions with large records, it may probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for all subpartitions [1], that means we only have one bytes array overhead at most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
------------------------------------------------------------------
发件人:Akshay Mendole <ak...@gmail.com>
发送时间:2018年11月22日(星期四) 13:43
收件人:user <us...@flink.apache.org>
主 题:OutOfMemoryError while doing join operation in flink

Hi,
    We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error 

2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
        at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
        at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
        at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
        at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
        at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
        at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
        at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
        ... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
        at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
        at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
        at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
        at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)


From the exception view in flink job manager dashboard, we could see that this is happening at a join operation. 
When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.
Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.

Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0

Let me know what more information should I fetch and post here in order for you to help me resolve this.

Thanks,
Akshay



Re: OutOfMemoryError while doing join operation in flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Flink handles large data volumes quite well, large records are a bit more
tricky to tune.
You could try to reduce the number of parallel tasks per machine (#slots
per TM, #TMs per machine) and/or increase the amount of available JVM
memory (possible in exchange for managed memory as Zhijiang suggested).

Best, Fabian

Am Mi., 28. Nov. 2018 um 07:44 Uhr schrieb Akshay Mendole <
akshaymendole@gmail.com>:

> Hi Zhijiang,
>                   Thanks for the explanation and the workaround suggested.
> While this can work for the example stated above, we have more complex use
> cases where we would have to re-tune the above parameters. FYI, we ran into
> same problems when we did a simple groupBy on the skewed dataset.
> Thanks,
> Akshay
>
>
> On Fri, Nov 23, 2018 at 8:29 AM zhijiang <wa...@aliyun.com>
> wrote:
>
>> Hi Akshay,
>>
>> Sorrry I have not thought of a proper way to handle single large record
>> in distributed task managers in flink. But I can give some hints for
>> adjusting the related memories for work around OOM issue.
>> Large fraction of memories in task manager are managed by flink for
>> efficiency, and these memories are long live persistent in JVM not recycled
>> by gc. You can check the parameter "taskmanager.memory.fraction" for this
>> and the default value is 0.7 if you have not changed, that means 7GB * 0.7
>> are used by framework.
>>
>> I am not sure what is the flink version you used. If I rememberd
>> correctly, before release-1.5 the network buffers also uses heap memories
>> by default, so you should also minus this part of memory from total task
>> manager memory.
>>
>> If not considering network buffer used by framework, you only leave 7GB *
>> 0.3 temporaray memories for other parts. The temporaray memories in
>> serializer will exceed twice as current size every time if not covering the
>> record size, that means one serializer may need 2GB overhead memories for
>> your 1GB record. You have 2 slots per task manager for running two tasks,
>> so the total overhead memories may need 4GB almost. So you can decrease
>> the "taskmanager.memory.fraction" in low fraction or increase the total
>> task manager to cover this overhead memories, or set one slot for each task
>> manager.
>>
>> Best,
>> Zhijiang
>>
>> ------------------------------------------------------------------
>> 发件人:Akshay Mendole <ak...@gmail.com>
>> 发送时间:2018年11月23日(星期五) 02:54
>> 收件人:trohrmann <tr...@apache.org>
>> 抄 送:zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>;
>> Shreesha Madogaran <ms...@gmail.com>
>> 主 题:Re: OutOfMemoryError while doing join operation in flink
>>
>> Hi,
>>     Thanks for your reply. I tried running a simple "group by" on just
>> one dataset where few keys are repeatedly occurring (in order of millions)
>> and did not include any joins. I wanted to see if this issue is specific to
>> join. But as I was expecting, I ran into the same issue. I am giving 7GBs
>> to each task manager with 2 slots per task manager. From what I understood
>> so far, such cases where individual records somewhere in the pipeline
>> become so large that they should be handled in distributed manner instead
>> of handling them by a simple data structure in single JVM. I am guessing
>> there is no way to do this in Flink today.
>> Could you please confirm this?
>> Thanks,
>> Akshay
>>
>>
>> On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>> Hi Akshay,
>>
>> Flink currently does not support to automatically distribute hot keys
>> across different JVMs. What you can do is to adapt the parallelism/number
>> of partitions manually if you encounter that one partition contains a lot
>> of hot keys. This might mitigate the problem by partitioning the hot keys
>> into different partitions.
>>
>> Apart from that, the problem seems to be as Zhijiang indicated that your
>> join result is quite large. One record is 1 GB large. Try to decrease it or
>> give more memory to your TMs.
>>
>> Cheers,
>> Till
>>
>> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole <ak...@gmail.com>
>> wrote:
>> Hi Zhijiang,
>>                  Thanks for the quick reply. My concern is more towards
>> how flink perform joins of two *skewed *datasets. Pig
>> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
>> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the
>> join of skewed datasets. The record size that you are mentioning about in
>> your reply is after join operation takes place which is definitely going to
>> be huge enough not to fit in jvm task manager task slot in my use case. We
>> want to know if there is a way in flink to handle such skewed keys by
>> distributing their values across different jvms. Let me know if you need
>> more clarity on the issue.
>> Thanks,
>> Akshay
>>
>> On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wa...@aliyun.com>
>> wrote:
>> Hi Akshay,
>>
>> You encountered an existing issue for serializing large records to cause
>> OOM.
>>
>> Every subpartition would create a separate serializer before, and each
>> serializer would maintain an internal bytes array for storing intermediate
>> serialization results. The key point is that these overhead internal bytes
>> array are not managed by framework, and their size would exceed with the
>> record size dynamically. If your job has many subpartitions with large
>> records, it may probably cause OOM issue.
>>
>> I already improved this issue to some extent by sharing only one
>> serializer for all subpartitions [1], that means we only have one bytes
>> array overhead at most. This issue is covered in release-1.7.
>> Currently the best option may reduce your record size if possible or you
>> can increase the heap size of task manager container.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9913
>>
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> 发件人:Akshay Mendole <ak...@gmail.com>
>> 发送时间:2018年11月22日(星期四) 13:43
>> 收件人:user <us...@flink.apache.org>
>> 主 题:OutOfMemoryError while doing join operation in flink
>>
>> Hi,
>>     We are converting one of our pig pipelines to flink using apache
>> beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
>> enriches them, joins them and dumps back to hdfs. The data set R1 is
>> skewed. In a sense, it has few keys with lot of records. When we converted
>> the pig pipeline to apache beam and ran it using flink on a production yarn
>> cluster, we got the following error
>>
>> 2018-11-21 16:52:25,307 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
>> java.lang.RuntimeException: Emitting the record caused an I/O exception:
>> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
>> JVM heap space
>>         at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>>         at
>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>         at
>> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>>         at
>> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>>         at
>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Failed to serialize element. Serialized
>> size (> 1136656562 bytes) exceeds JVM heap space
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>>         at
>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>         at
>> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>>         at
>> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>         at
>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>>         at
>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>>         at
>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>         at
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>>         at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>         ... 9 more
>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>>         at
>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>         at
>> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>>         at
>> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>         at
>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>>         at
>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>>         at
>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>         at
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>>         at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>
>>
>> From the exception view in flink job manager dashboard, we could see that
>> this is happening at a join operation.
>> *When I say R1 dataset is skewed, there are some keys with number of
>> occurrences as high as 8,000,000 , while most of the keys occur just once.*
>> *Dataset R2 has records with keys occurring at most once.*
>> Also, if we exclude such keys which has high number of occurrences, the
>> pipeline runs absolutely fine which proves it is happening due these few
>> keys only.
>>
>> Hadoop version : 2.7.1
>> Beam verision : 2.8.0
>> Flink Runner version : 2.8.0
>>
>> Let me know what more information should I fetch and post here in order
>> for you to help me resolve this.
>>
>> Thanks,
>> Akshay
>>
>>
>>
>>

Re: OutOfMemoryError while doing join operation in flink

Posted by Akshay Mendole <ak...@gmail.com>.
Hi Zhijiang,
                  Thanks for the explanation and the workaround suggested.
While this can work for the example stated above, we have more complex use
cases where we would have to re-tune the above parameters. FYI, we ran into
same problems when we did a simple groupBy on the skewed dataset.
Thanks,
Akshay


On Fri, Nov 23, 2018 at 8:29 AM zhijiang <wa...@aliyun.com> wrote:

> Hi Akshay,
>
> Sorrry I have not thought of a proper way to handle single large record in
> distributed task managers in flink. But I can give some hints for adjusting
> the related memories for work around OOM issue.
> Large fraction of memories in task manager are managed by flink for
> efficiency, and these memories are long live persistent in JVM not recycled
> by gc. You can check the parameter "taskmanager.memory.fraction" for this
> and the default value is 0.7 if you have not changed, that means 7GB * 0.7
> are used by framework.
>
> I am not sure what is the flink version you used. If I rememberd
> correctly, before release-1.5 the network buffers also uses heap memories
> by default, so you should also minus this part of memory from total task
> manager memory.
>
> If not considering network buffer used by framework, you only leave 7GB *
> 0.3 temporaray memories for other parts. The temporaray memories in
> serializer will exceed twice as current size every time if not covering the
> record size, that means one serializer may need 2GB overhead memories for
> your 1GB record. You have 2 slots per task manager for running two tasks,
> so the total overhead memories may need 4GB almost. So you can decrease
> the "taskmanager.memory.fraction" in low fraction or increase the total
> task manager to cover this overhead memories, or set one slot for each task
> manager.
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:Akshay Mendole <ak...@gmail.com>
> 发送时间:2018年11月23日(星期五) 02:54
> 收件人:trohrmann <tr...@apache.org>
> 抄 送:zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>;
> Shreesha Madogaran <ms...@gmail.com>
> 主 题:Re: OutOfMemoryError while doing join operation in flink
>
> Hi,
>     Thanks for your reply. I tried running a simple "group by" on just one
> dataset where few keys are repeatedly occurring (in order of millions)  and
> did not include any joins. I wanted to see if this issue is specific to
> join. But as I was expecting, I ran into the same issue. I am giving 7GBs
> to each task manager with 2 slots per task manager. From what I understood
> so far, such cases where individual records somewhere in the pipeline
> become so large that they should be handled in distributed manner instead
> of handling them by a simple data structure in single JVM. I am guessing
> there is no way to do this in Flink today.
> Could you please confirm this?
> Thanks,
> Akshay
>
>
> On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann <tr...@apache.org>
> wrote:
> Hi Akshay,
>
> Flink currently does not support to automatically distribute hot keys
> across different JVMs. What you can do is to adapt the parallelism/number
> of partitions manually if you encounter that one partition contains a lot
> of hot keys. This might mitigate the problem by partitioning the hot keys
> into different partitions.
>
> Apart from that, the problem seems to be as Zhijiang indicated that your
> join result is quite large. One record is 1 GB large. Try to decrease it or
> give more memory to your TMs.
>
> Cheers,
> Till
>
> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole <ak...@gmail.com>
> wrote:
> Hi Zhijiang,
>                  Thanks for the quick reply. My concern is more towards
> how flink perform joins of two *skewed *datasets. Pig
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the join
> of skewed datasets. The record size that you are mentioning about in your
> reply is after join operation takes place which is definitely going to be
> huge enough not to fit in jvm task manager task slot in my use case. We
> want to know if there is a way in flink to handle such skewed keys by
> distributing their values across different jvms. Let me know if you need
> more clarity on the issue.
> Thanks,
> Akshay
>
> On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wa...@aliyun.com>
> wrote:
> Hi Akshay,
>
> You encountered an existing issue for serializing large records to cause
> OOM.
>
> Every subpartition would create a separate serializer before, and each
> serializer would maintain an internal bytes array for storing intermediate
> serialization results. The key point is that these overhead internal bytes
> array are not managed by framework, and their size would exceed with the
> record size dynamically. If your job has many subpartitions with large
> records, it may probably cause OOM issue.
>
> I already improved this issue to some extent by sharing only one
> serializer for all subpartitions [1], that means we only have one bytes
> array overhead at most. This issue is covered in release-1.7.
> Currently the best option may reduce your record size if possible or you
> can increase the heap size of task manager container.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9913
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Akshay Mendole <ak...@gmail.com>
> 发送时间:2018年11月22日(星期四) 13:43
> 收件人:user <us...@flink.apache.org>
> 主 题:OutOfMemoryError while doing join operation in flink
>
> Hi,
>     We are converting one of our pig pipelines to flink using apache beam.
> The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
> enriches them, joins them and dumps back to hdfs. The data set R1 is
> skewed. In a sense, it has few keys with lot of records. When we converted
> the pig pipeline to apache beam and ran it using flink on a production yarn
> cluster, we got the following error
>
> 2018-11-21 16:52:25,307 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
> java.lang.RuntimeException: Emitting the record caused an I/O exception:
> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
> JVM heap space
>         at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>         at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>         at
> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>         at
> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>         at
> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to serialize element. Serialized
> size (> 1136656562 bytes) exceeds JVM heap space
>         at
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
>         at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>         at
> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>         at
> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>         at
> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>         at
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>         at
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>         at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>         at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>         at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>         at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>         at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>         at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>         ... 9 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
>         at
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>         at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>         at
> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>         at
> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>         at
> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>         at
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>         at
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>         at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>         at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>         at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>         at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>         at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>         at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>
>
> From the exception view in flink job manager dashboard, we could see that
> this is happening at a join operation.
> *When I say R1 dataset is skewed, there are some keys with number of
> occurrences as high as 8,000,000 , while most of the keys occur just once.*
> *Dataset R2 has records with keys occurring at most once.*
> Also, if we exclude such keys which has high number of occurrences, the
> pipeline runs absolutely fine which proves it is happening due these few
> keys only.
>
> Hadoop version : 2.7.1
> Beam verision : 2.8.0
> Flink Runner version : 2.8.0
>
> Let me know what more information should I fetch and post here in order
> for you to help me resolve this.
>
> Thanks,
> Akshay
>
>
>
>

回复:OutOfMemoryError while doing join operation in flink

Posted by zhijiang <wa...@aliyun.com>.
Hi Akshay,

Sorrry I have not thought of a proper way to handle single large record in distributed task managers in flink. But I can give some hints for adjusting the related memories for work around OOM issue.
Large fraction of memories in task manager are managed by flink for efficiency, and these memories are long live persistent in JVM not recycled by gc. You can check the parameter "taskmanager.memory.fraction" for this and the default value is 0.7 if you have not changed, that means 7GB * 0.7 are used by framework.

I am not sure what is the flink version you used. If I rememberd correctly, before release-1.5 the network buffers also uses heap memories by default, so you should also minus this part of memory from total task manager memory.

If not considering network buffer used by framework, you only leave 7GB * 0.3 temporaray memories for other parts. The temporaray memories in serializer will exceed twice as current size every time if not covering the record size, that means one serializer may need 2GB overhead memories for your 1GB record. You have 2 slots per task manager for running two tasks, so the total overhead memories may need 4GB almost. So you can decrease the "taskmanager.memory.fraction" in low fraction or increase the total task manager to cover this overhead memories, or set one slot for each task manager. 

Best,
Zhijiang


------------------------------------------------------------------
发件人:Akshay Mendole <ak...@gmail.com>
发送时间:2018年11月23日(星期五) 02:54
收件人:trohrmann <tr...@apache.org>
抄 送:zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>; Shreesha Madogaran <ms...@gmail.com>
主 题:Re: OutOfMemoryError while doing join operation in flink

Hi,
    Thanks for your reply. I tried running a simple "group by" on just one dataset where few keys are repeatedly occurring (in order of millions)  and did not include any joins. I wanted to see if this issue is specific to join. But as I was expecting, I ran into the same issue. I am giving 7GBs to each task manager with 2 slots per task manager. From what I understood so far, such cases where individual records somewhere in the pipeline become so large that they should be handled in distributed manner instead of handling them by a simple data structure in single JVM. I am guessing there is no way to do this in Flink today. 
Could you please confirm this?
Thanks,
Akshay


On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann <tr...@apache.org> wrote:
Hi Akshay,

Flink currently does not support to automatically distribute hot keys across different JVMs. What you can do is to adapt the parallelism/number of partitions manually if you encounter that one partition contains a lot of hot keys. This might mitigate the problem by partitioning the hot keys into different partitions.

Apart from that, the problem seems to be as Zhijiang indicated that your join result is quite large. One record is 1 GB large. Try to decrease it or give more memory to your TMs.

Cheers,
Till
On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole <ak...@gmail.com> wrote:
Hi Zhijiang,
                 Thanks for the quick reply. My concern is more towards how flink perform joins of two skewed datasets. Pig and spark seems to support the join of skewed datasets. The record size that you are mentioning about in your reply is after join operation takes place which is definitely going to be huge enough not to fit in jvm task manager task slot in my use case. We want to know if there is a way in flink to handle such skewed keys by distributing their values across different jvms. Let me know if you need more clarity on the issue.
Thanks, 
Akshay 
On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wa...@aliyun.com> wrote:
Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each serializer would maintain an internal bytes array for storing intermediate serialization results. The key point is that these overhead internal bytes array are not managed by framework, and their size would exceed with the record size dynamically. If your job has many subpartitions with large records, it may probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for all subpartitions [1], that means we only have one bytes array overhead at most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
------------------------------------------------------------------
发件人:Akshay Mendole <ak...@gmail.com>
发送时间:2018年11月22日(星期四) 13:43
收件人:user <us...@flink.apache.org>
主 题:OutOfMemoryError while doing join operation in flink

Hi,
    We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error 

2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
        at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
        at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
        at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
        at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
        at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
        at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
        at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
        ... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
        at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
        at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
        at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
        at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)


From the exception view in flink job manager dashboard, we could see that this is happening at a join operation. 
When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.
Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.

Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0

Let me know what more information should I fetch and post here in order for you to help me resolve this.

Thanks,
Akshay




Re: OutOfMemoryError while doing join operation in flink

Posted by Ken Krugler <kk...@transpac.com>.
Hi Akshay,

I don’t know much about the Beam/Flink integration, but I’m curious why you have a single record that would contain all 8M records with the same key.

E.g. the code for your simple “group by” test would be interesting.

— Ken


> On Nov 22, 2018, at 10:54 AM, Akshay Mendole <ak...@gmail.com> wrote:
> 
> Hi,
>     Thanks for your reply. I tried running a simple "group by" on just one dataset where few keys are repeatedly occurring (in order of millions)  and did not include any joins. I wanted to see if this issue is specific to join. But as I was expecting, I ran into the same issue. I am giving 7GBs to each task manager with 2 slots per task manager. From what I understood so far, such cases where individual records somewhere in the pipeline become so large that they should be handled in distributed manner instead of handling them by a simple data structure in single JVM. I am guessing there is no way to do this in Flink today. 
> Could you please confirm this?
> Thanks,
> Akshay
> 
> 
> On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann <trohrmann@apache.org <ma...@apache.org>> wrote:
> Hi Akshay,
> 
> Flink currently does not support to automatically distribute hot keys across different JVMs. What you can do is to adapt the parallelism/number of partitions manually if you encounter that one partition contains a lot of hot keys. This might mitigate the problem by partitioning the hot keys into different partitions.
> 
> Apart from that, the problem seems to be as Zhijiang indicated that your join result is quite large. One record is 1 GB large. Try to decrease it or give more memory to your TMs.
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole <akshaymendole@gmail.com <ma...@gmail.com>> wrote:
> Hi Zhijiang,
>                  Thanks for the quick reply. My concern is more towards how flink perform joins of two skewed datasets. Pig <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the join of skewed datasets. The record size that you are mentioning about in your reply is after join operation takes place which is definitely going to be huge enough not to fit in jvm task manager task slot in my use case. We want to know if there is a way in flink to handle such skewed keys by distributing their values across different jvms. Let me know if you need more clarity on the issue.
> Thanks, 
> Akshay 
> 
> On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wangzhijiang999@aliyun.com <ma...@aliyun.com>> wrote:
> Hi Akshay,
> 
> You encountered an existing issue for serializing large records to cause OOM.
> 
> Every subpartition would create a separate serializer before, and each serializer would maintain an internal bytes array for storing intermediate serialization results. The key point is that these overhead internal bytes array are not managed by framework, and their size would exceed with the record size dynamically. If your job has many subpartitions with large records, it may probably cause OOM issue.
> 
> I already improved this issue to some extent by sharing only one serializer for all subpartitions [1], that means we only have one bytes array overhead at most. This issue is covered in release-1.7.
> Currently the best option may reduce your record size if possible or you can increase the heap size of task manager container.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-9913 <https://issues.apache.org/jira/browse/FLINK-9913>
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Akshay Mendole <akshaymendole@gmail.com <ma...@gmail.com>>
> 发送时间:2018年11月22日(星期四) 13:43
> 收件人:user <user@flink.apache.org <ma...@flink.apache.org>>
> 主 题:OutOfMemoryError while doing join operation in flink
> 
> Hi,
>     We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error 
> 
> 2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
> java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
>         at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>         at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>         at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>         at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>         at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>         at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
>         at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
>         at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>         at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>         at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>         at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>         at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>         at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>         at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>         at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>         at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>         at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>         at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>         at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>         at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>         at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>         at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>         at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>         at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>         at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>         at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>         at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>         at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>         ... 9 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
>         at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>         at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>         at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>         at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>         at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>         at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>         at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>         at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>         at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>         at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>         at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>         at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>         at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>         at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>         at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>         at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>         at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>         at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>         at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>         at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>         at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>         at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> 
> 
> From the exception view in flink job manager dashboard, we could see that this is happening at a join operation. 
> When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
> Dataset R2 has records with keys occurring at most once.
> Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.
> 
> Hadoop version : 2.7.1
> Beam verision : 2.8.0
> Flink Runner version : 2.8.0
> 
> Let me know what more information should I fetch and post here in order for you to help me resolve this.
> 
> Thanks,
> Akshay
> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Re: OutOfMemoryError while doing join operation in flink

Posted by Akshay Mendole <ak...@gmail.com>.
Hi,
    Thanks for your reply. I tried running a simple "group by" on just one
dataset where few keys are repeatedly occurring (in order of millions)  and
did not include any joins. I wanted to see if this issue is specific to
join. But as I was expecting, I ran into the same issue. I am giving 7GBs
to each task manager with 2 slots per task manager. From what I understood
so far, such cases where individual records somewhere in the pipeline
become so large that they should be handled in distributed manner instead
of handling them by a simple data structure in single JVM. I am guessing
there is no way to do this in Flink today.
Could you please confirm this?
Thanks,
Akshay


On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Akshay,
>
> Flink currently does not support to automatically distribute hot keys
> across different JVMs. What you can do is to adapt the parallelism/number
> of partitions manually if you encounter that one partition contains a lot
> of hot keys. This might mitigate the problem by partitioning the hot keys
> into different partitions.
>
> Apart from that, the problem seems to be as Zhijiang indicated that your
> join result is quite large. One record is 1 GB large. Try to decrease it or
> give more memory to your TMs.
>
> Cheers,
> Till
>
> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole <ak...@gmail.com>
> wrote:
>
>> Hi Zhijiang,
>>                  Thanks for the quick reply. My concern is more towards
>> how flink perform joins of two *skewed *datasets. Pig
>> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
>> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the
>> join of skewed datasets. The record size that you are mentioning about in
>> your reply is after join operation takes place which is definitely going to
>> be huge enough not to fit in jvm task manager task slot in my use case. We
>> want to know if there is a way in flink to handle such skewed keys by
>> distributing their values across different jvms. Let me know if you need
>> more clarity on the issue.
>> Thanks,
>> Akshay
>>
>> On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wa...@aliyun.com>
>> wrote:
>>
>>> Hi Akshay,
>>>
>>> You encountered an existing issue for serializing large records to cause
>>> OOM.
>>>
>>> Every subpartition would create a separate serializer before, and each
>>> serializer would maintain an internal bytes array for storing intermediate
>>> serialization results. The key point is that these overhead internal bytes
>>> array are not managed by framework, and their size would exceed with the
>>> record size dynamically. If your job has many subpartitions with large
>>> records, it may probably cause OOM issue.
>>>
>>> I already improved this issue to some extent by sharing only one
>>> serializer for all subpartitions [1], that means we only have one bytes
>>> array overhead at most. This issue is covered in release-1.7.
>>> Currently the best option may reduce your record size if possible or you
>>> can increase the heap size of task manager container.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9913
>>>
>>> Best,
>>> Zhijiang
>>>
>>> ------------------------------------------------------------------
>>> 发件人:Akshay Mendole <ak...@gmail.com>
>>> 发送时间:2018年11月22日(星期四) 13:43
>>> 收件人:user <us...@flink.apache.org>
>>> 主 题:OutOfMemoryError while doing join operation in flink
>>>
>>> Hi,
>>>     We are converting one of our pig pipelines to flink using apache
>>> beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
>>> enriches them, joins them and dumps back to hdfs. The data set R1 is
>>> skewed. In a sense, it has few keys with lot of records. When we converted
>>> the pig pipeline to apache beam and ran it using flink on a production yarn
>>> cluster, we got the following error
>>>
>>> 2018-11-21 16:52:25,307 ERROR
>>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>>> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
>>> java.lang.RuntimeException: Emitting the record caused an I/O exception:
>>> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
>>> JVM heap space
>>>         at
>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>>>         at
>>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>>         at
>>> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>>>         at
>>> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>>>         at
>>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>>>         at
>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>>>         at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>>         at
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.IOException: Failed to serialize element. Serialized
>>> size (> 1136656562 bytes) exceeds JVM heap space
>>>         at
>>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
>>>         at
>>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>>>         at
>>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>>>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>>>         at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>>         at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>>         at
>>> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>>>         at
>>> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>>>         at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>>>         at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>         at
>>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>>>         at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>>>         at
>>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>>>         at
>>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>>         at
>>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>>>         at
>>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>>>         at
>>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>>>         at
>>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>>>         at
>>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>>>         at
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>>>         at
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>>>         at
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>>>         at
>>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>>>         at
>>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>>         at
>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>>>         at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>>>         at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>>>         at
>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>>         ... 9 more
>>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>>         at
>>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>>>         at
>>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>>>         at
>>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>>>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>>>         at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>>         at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>>         at
>>> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>>>         at
>>> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>>>         at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>>>         at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>         at
>>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>>>         at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>>>         at
>>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>>>         at
>>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>>         at
>>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>>>         at
>>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>>>         at
>>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>>>         at
>>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>>>         at
>>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>>>         at
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>>>         at
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>>>         at
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>>>         at
>>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>>>         at
>>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>>         at
>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>>>         at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>>>         at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>>>         at
>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>>
>>>
>>> From the exception view in flink job manager dashboard, we could see
>>> that this is happening at a join operation.
>>> *When I say R1 dataset is skewed, there are some keys with number of
>>> occurrences as high as 8,000,000 , while most of the keys occur just once.*
>>> *Dataset R2 has records with keys occurring at most once.*
>>> Also, if we exclude such keys which has high number of occurrences, the
>>> pipeline runs absolutely fine which proves it is happening due these few
>>> keys only.
>>>
>>> Hadoop version : 2.7.1
>>> Beam verision : 2.8.0
>>> Flink Runner version : 2.8.0
>>>
>>> Let me know what more information should I fetch and post here in order
>>> for you to help me resolve this.
>>>
>>> Thanks,
>>> Akshay
>>>
>>>
>>>

Re: OutOfMemoryError while doing join operation in flink

Posted by Till Rohrmann <tr...@apache.org>.
Hi Akshay,

Flink currently does not support to automatically distribute hot keys
across different JVMs. What you can do is to adapt the parallelism/number
of partitions manually if you encounter that one partition contains a lot
of hot keys. This might mitigate the problem by partitioning the hot keys
into different partitions.

Apart from that, the problem seems to be as Zhijiang indicated that your
join result is quite large. One record is 1 GB large. Try to decrease it or
give more memory to your TMs.

Cheers,
Till

On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole <ak...@gmail.com>
wrote:

> Hi Zhijiang,
>                  Thanks for the quick reply. My concern is more towards
> how flink perform joins of two *skewed *datasets. Pig
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the join
> of skewed datasets. The record size that you are mentioning about in your
> reply is after join operation takes place which is definitely going to be
> huge enough not to fit in jvm task manager task slot in my use case. We
> want to know if there is a way in flink to handle such skewed keys by
> distributing their values across different jvms. Let me know if you need
> more clarity on the issue.
> Thanks,
> Akshay
>
> On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wa...@aliyun.com>
> wrote:
>
>> Hi Akshay,
>>
>> You encountered an existing issue for serializing large records to cause
>> OOM.
>>
>> Every subpartition would create a separate serializer before, and each
>> serializer would maintain an internal bytes array for storing intermediate
>> serialization results. The key point is that these overhead internal bytes
>> array are not managed by framework, and their size would exceed with the
>> record size dynamically. If your job has many subpartitions with large
>> records, it may probably cause OOM issue.
>>
>> I already improved this issue to some extent by sharing only one
>> serializer for all subpartitions [1], that means we only have one bytes
>> array overhead at most. This issue is covered in release-1.7.
>> Currently the best option may reduce your record size if possible or you
>> can increase the heap size of task manager container.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9913
>>
>> Best,
>> Zhijiang
>>
>> ------------------------------------------------------------------
>> 发件人:Akshay Mendole <ak...@gmail.com>
>> 发送时间:2018年11月22日(星期四) 13:43
>> 收件人:user <us...@flink.apache.org>
>> 主 题:OutOfMemoryError while doing join operation in flink
>>
>> Hi,
>>     We are converting one of our pig pipelines to flink using apache
>> beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
>> enriches them, joins them and dumps back to hdfs. The data set R1 is
>> skewed. In a sense, it has few keys with lot of records. When we converted
>> the pig pipeline to apache beam and ran it using flink on a production yarn
>> cluster, we got the following error
>>
>> 2018-11-21 16:52:25,307 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
>> java.lang.RuntimeException: Emitting the record caused an I/O exception:
>> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
>> JVM heap space
>>         at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>>         at
>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>         at
>> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>>         at
>> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>>         at
>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Failed to serialize element. Serialized
>> size (> 1136656562 bytes) exceeds JVM heap space
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>>         at
>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>         at
>> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>>         at
>> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>         at
>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>>         at
>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>>         at
>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>         at
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>>         at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>         ... 9 more
>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>>         at
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>>         at
>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>         at
>> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>>         at
>> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>         at
>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>>         at
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>>         at
>> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>>         at
>> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>>         at
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>>         at
>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>>         at
>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>         at
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>>         at
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>>         at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>
>>
>> From the exception view in flink job manager dashboard, we could see that
>> this is happening at a join operation.
>> *When I say R1 dataset is skewed, there are some keys with number of
>> occurrences as high as 8,000,000 , while most of the keys occur just once.*
>> *Dataset R2 has records with keys occurring at most once.*
>> Also, if we exclude such keys which has high number of occurrences, the
>> pipeline runs absolutely fine which proves it is happening due these few
>> keys only.
>>
>> Hadoop version : 2.7.1
>> Beam verision : 2.8.0
>> Flink Runner version : 2.8.0
>>
>> Let me know what more information should I fetch and post here in order
>> for you to help me resolve this.
>>
>> Thanks,
>> Akshay
>>
>>
>>

Re: OutOfMemoryError while doing join operation in flink

Posted by Akshay Mendole <ak...@gmail.com>.
Hi Zhijiang,
                 Thanks for the quick reply. My concern is more towards how
flink perform joins of two *skewed *datasets. Pig
<https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark
<https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the join
of skewed datasets. The record size that you are mentioning about in your
reply is after join operation takes place which is definitely going to be
huge enough not to fit in jvm task manager task slot in my use case. We
want to know if there is a way in flink to handle such skewed keys by
distributing their values across different jvms. Let me know if you need
more clarity on the issue.
Thanks,
Akshay

On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wa...@aliyun.com> wrote:

> Hi Akshay,
>
> You encountered an existing issue for serializing large records to cause
> OOM.
>
> Every subpartition would create a separate serializer before, and each
> serializer would maintain an internal bytes array for storing intermediate
> serialization results. The key point is that these overhead internal bytes
> array are not managed by framework, and their size would exceed with the
> record size dynamically. If your job has many subpartitions with large
> records, it may probably cause OOM issue.
>
> I already improved this issue to some extent by sharing only one
> serializer for all subpartitions [1], that means we only have one bytes
> array overhead at most. This issue is covered in release-1.7.
> Currently the best option may reduce your record size if possible or you
> can increase the heap size of task manager container.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9913
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:Akshay Mendole <ak...@gmail.com>
> 发送时间:2018年11月22日(星期四) 13:43
> 收件人:user <us...@flink.apache.org>
> 主 题:OutOfMemoryError while doing join operation in flink
>
> Hi,
>     We are converting one of our pig pipelines to flink using apache beam.
> The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
> enriches them, joins them and dumps back to hdfs. The data set R1 is
> skewed. In a sense, it has few keys with lot of records. When we converted
> the pig pipeline to apache beam and ran it using flink on a production yarn
> cluster, we got the following error
>
> 2018-11-21 16:52:25,307 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
> java.lang.RuntimeException: Emitting the record caused an I/O exception:
> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
> JVM heap space
>         at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>         at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>         at
> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>         at
> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>         at
> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to serialize element. Serialized
> size (> 1136656562 bytes) exceeds JVM heap space
>         at
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
>         at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>         at
> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>         at
> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>         at
> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>         at
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>         at
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>         at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>         at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>         at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>         at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>         at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>         at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>         ... 9 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
>         at
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>         at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>         at
> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>         at
> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>         at
> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>         at
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>         at
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>         at
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>         at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>         at
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>         at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>         at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>         at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>         at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>
>
> From the exception view in flink job manager dashboard, we could see that
> this is happening at a join operation.
> *When I say R1 dataset is skewed, there are some keys with number of
> occurrences as high as 8,000,000 , while most of the keys occur just once.*
> *Dataset R2 has records with keys occurring at most once.*
> Also, if we exclude such keys which has high number of occurrences, the
> pipeline runs absolutely fine which proves it is happening due these few
> keys only.
>
> Hadoop version : 2.7.1
> Beam verision : 2.8.0
> Flink Runner version : 2.8.0
>
> Let me know what more information should I fetch and post here in order
> for you to help me resolve this.
>
> Thanks,
> Akshay
>
>
>