You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Andra Lungu <lu...@gmail.com> on 2015/07/10 14:35:49 UTC

Passing around huge hash sets

Hey!

It appears that my jobs have the same memory issue disguised in different
Exceptions. It's expected, I am passing around hash sets of neighbors and
for skewed graphs (i.e. with a lot of neighbors) it's bound to fail one way
or the other.

This time Kryo hates me :(
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
Failed to serialize element. Serialized size (> 2166784 bytes) exceeds JVM
heap space
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
    at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
    at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:261)
    at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
    at
com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:115)
    at
com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:109)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
    at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)
    at
com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:593)
    at
com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:589)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
    at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
    at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
    at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
    at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
    at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    at
util.DummyGraph$ApplyCoGroupToVertexValuesTuple2.coGroup(DummyGraph.java:368)
    at
org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
    at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:722)


Is there anything I can do to increase the heap size?

Thanks in advance!
Andra

Re: Passing around huge hash sets

Posted by Stephan Ewen <se...@apache.org>.
There is also an inherent limit to this style of passing massive data about
neighbors. They are often limited in their scalability as well.

At some point you may need to switch to probabilistic data structures. or
simply to a different method.

On Fri, Jul 10, 2015 at 2:54 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Yes, you can decrease the amount of managed memory and/or increase the TM
> heap size
>
> See
> - taskmanager.memory.size,
> - taskmanager.memory.fraction, and
> - taskmanager.heap.mb
>
> in
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html#common-options
>
> Cheers, Fabian
>
> 2015-07-10 14:35 GMT+02:00 Andra Lungu <lu...@gmail.com>:
>
> > Hey!
> >
> > It appears that my jobs have the same memory issue disguised in different
> > Exceptions. It's expected, I am passing around hash sets of neighbors and
> > for skewed graphs (i.e. with a lot of neighbors) it's bound to fail one
> way
> > or the other.
> >
> > This time Kryo hates me :(
> > Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> > Failed to serialize element. Serialized size (> 2166784 bytes) exceeds
> JVM
> > heap space
> >     at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> >     at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
> >     at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:261)
> >     at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
> >     at
> >
> >
> com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:115)
> >     at
> >
> >
> com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:109)
> >     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> >     at
> >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)
> >     at
> >
> >
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:593)
> >     at
> >
> >
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:589)
> >     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> >     at
> >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
> >     at
> >
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
> >     at
> >
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> >     at
> >
> >
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
> >     at
> >
> >
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
> >     at
> >
> >
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
> >     at
> >
> >
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> >     at
> >
> >
> util.DummyGraph$ApplyCoGroupToVertexValuesTuple2.coGroup(DummyGraph.java:368)
> >     at
> >
> >
> org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
> >     at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> >     at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >     at java.lang.Thread.run(Thread.java:722)
> >
> >
> > Is there anything I can do to increase the heap size?
> >
> > Thanks in advance!
> > Andra
> >
>

Re: Passing around huge hash sets

Posted by Fabian Hueske <fh...@gmail.com>.
Yes, you can decrease the amount of managed memory and/or increase the TM
heap size

See
- taskmanager.memory.size,
- taskmanager.memory.fraction, and
- taskmanager.heap.mb

in

https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html#common-options

Cheers, Fabian

2015-07-10 14:35 GMT+02:00 Andra Lungu <lu...@gmail.com>:

> Hey!
>
> It appears that my jobs have the same memory issue disguised in different
> Exceptions. It's expected, I am passing around hash sets of neighbors and
> for skewed graphs (i.e. with a lot of neighbors) it's bound to fail one way
> or the other.
>
> This time Kryo hates me :(
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> Failed to serialize element. Serialized size (> 2166784 bytes) exceeds JVM
> heap space
>     at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>     at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>     at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:261)
>     at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
>     at
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:115)
>     at
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:109)
>     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>     at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)
>     at
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:593)
>     at
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:589)
>     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>     at
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
>     at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
>     at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>     at
>
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
>     at
>
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>     at
>
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
>     at
>
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>     at
>
> util.DummyGraph$ApplyCoGroupToVertexValuesTuple2.coGroup(DummyGraph.java:368)
>     at
>
> org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:722)
>
>
> Is there anything I can do to increase the heap size?
>
> Thanks in advance!
> Andra
>