You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Boden, Christoph" <ch...@tu-berlin.de> on 2016/02/25 11:24:04 UTC

loss of TaskManager

Dear Flink Community,

I am trying to fit a support vector machine classifier using the CoCoA implementation provided in flink/ml/classification/ on a data set of moderate size (400k data points, 2000 features, approx. 12GB) on a cluster of 25 nodes with 28 GB memory each - and each worker node is awarded the full 28GB in  taskmanager.heap.mb. 

With the standard configuration I constantly run into different versions of JVM HeapSpace OutOfMemory Errors. (e.g. com.esotericsoftware.kryo.KryoException: java.io.IOException: Failed to serialize element. Serialized size (> 276647402 bytes) exceeds JVM heap space - Serialization trace: data (org.apache.flink.ml.math.SparseVector) ... ")

As changing DOP did not alter anything, I significantly reduced the taskmanager.memory.fraction. With this I now (reproducibly) run into the following problem. 

After running for a while, the job fails with the following error:

java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager  @ host slots - URL: akka.tcp://flink@url
2/user/taskmanager
        at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
        at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
        at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
        at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
        at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
        at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
        at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


However the log of the taskmanager in question does not show any error or exception in its log. The last log entry is:

2016-02-25 09:38:12,543 INFO  org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask  - finishing iteration [2]:  Combine (Reduce at org.apache.flink.ml.classification.SVM$$anon$25$$anonfun$6.apply(SVM.scala:392)) (3/91)

I am somewhat puzzled what could be the cause of this. Any help, or pointers to appropriate documentation would be greatly appreciated. 

I'll try increasing the heartbeat intervals next, but would still like to understand what goes wrong here.

Best regards,
Christoph


Re: loss of TaskManager

Posted by Till Rohrmann <tr...@apache.org>.
Hi Christoph,

have you tried setting the blocks parameter of the SVM algorithm? That
basically decides how many features are grouped together in one block. The
lower the value is the more feature vectors are grouped together and, thus,
the size of the block is increased. Increasing this value might solve the
OOM exception.

Cheers,
Till

On Thu, Feb 25, 2016 at 1:06 PM, Boden, Christoph <
christoph.boden@tu-berlin.de> wrote:

> Hi Ufuk,
>
> thanks for the hint. Unfortunately  I cannot access the system log on the
> remote machine. But i re-ran the job with slightly increased
> memory.fraction (0.3 -> 0.4) and got an OutOfMemory Exception again:
>
> cloud-25
> Error: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> Failed to serialize element. Serialized size (> 553259005 bytes) exceeds
> JVM heap space
> Serialization trace:
> data (org.apache.flink.ml.math.SparseVector)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
> at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:501)
> at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:630)
> at com.esotericsoftware.kryo.io.Output.writeDoubles(Output.java:711)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:198)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:187)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:95)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:90)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:30)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
> at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> at
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
> at
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:310)
> at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to serialize element. Serialized
> size (> 553259005 bytes) exceeds JVM heap space
> at
> org.apache.flink.runtime.util.DataOutputSerializer.resize(DataOutputSerializer.java:288)
> at
> org.apache.flink.runtime.util.DataOutputSerializer.write(DataOutputSerializer.java:117)
> at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
> ... 36 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.flink.runtime.util.DataOutputSerializer.resize(DataOutputSerializer.java:284)
> at
> org.apache.flink.runtime.util.DataOutputSerializer.write(DataOutputSerializer.java:117)
> at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
> at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:501)
> at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:630)
> at com.esotericsoftware.kryo.io.Output.writeDoubles(Output.java:711)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:198)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:187)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:95)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:90)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:30)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
> cloud-22.
> Error:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Error at remote task manager 'cloud-25'.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:239)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:162)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> org.apache.flink.runtime.io.network.partition.ProducerFailedException:
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
> at
> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> ... 2 more
>
> Is there anything else that can be done to alleviate this problem?
>
> Best and thanks,
> Christoph
> ________________________________________
> Von: Ufuk Celebi <uc...@apache.org>
> Gesendet: Donnerstag, 25. Februar 2016 12:35
> An: user@flink.apache.org
> Betreff: Re: loss of TaskManager
>
> Hey Chris!
>
> I think that the full amount of memory to Flink leads to the TM
> process being killed by the OS. Can you check the OS logs whether the
> OOM killer shut it down? You should be able to see this in the system
> logs.
>
> – Ufuk
>
>
> On Thu, Feb 25, 2016 at 11:24 AM, Boden, Christoph
> <ch...@tu-berlin.de> wrote:
> > Dear Flink Community,
> >
> > I am trying to fit a support vector machine classifier using the CoCoA
> implementation provided in flink/ml/classification/ on a data set of
> moderate size (400k data points, 2000 features, approx. 12GB) on a cluster
> of 25 nodes with 28 GB memory each - and each worker node is awarded the
> full 28GB in  taskmanager.heap.mb.
> >
> > With the standard configuration I constantly run into different versions
> of JVM HeapSpace OutOfMemory Errors. (e.g.
> com.esotericsoftware.kryo.KryoException: java.io.IOException: Failed to
> serialize element. Serialized size (> 276647402 bytes) exceeds JVM heap
> space - Serialization trace: data (org.apache.flink.ml.math.SparseVector)
> ... ")
> >
> > As changing DOP did not alter anything, I significantly reduced the
> taskmanager.memory.fraction. With this I now (reproducibly) run into the
> following problem.
> >
> > After running for a while, the job fails with the following error:
> >
> > java.lang.Exception: The slot in which the task was executed has been
> released. Probably loss of TaskManager  @ host slots - URL:
> akka.tcp://flink@url
> > 2/user/taskmanager
> >         at
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
> >         at
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
> >         at
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
> >         at
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
> >         at
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
> >         at
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
> >         at
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
> >         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
> >         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> >         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> >         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> >         at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> >         at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> >         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> >         at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> >         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> >         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >         at
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
> >         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> >         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> >         at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> >         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> >         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> >         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> >         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
> >
> > However the log of the taskmanager in question does not show any error
> or exception in its log. The last log entry is:
> >
> > 2016-02-25 09:38:12,543 INFO
> org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask  -
> finishing iteration [2]:  Combine (Reduce at
> org.apache.flink.ml.classification.SVM$$anon$25$$anonfun$6.apply(SVM.scala:392))
> (3/91)
> >
> > I am somewhat puzzled what could be the cause of this. Any help, or
> pointers to appropriate documentation would be greatly appreciated.
> >
> > I'll try increasing the heartbeat intervals next, but would still like
> to understand what goes wrong here.
> >
> > Best regards,
> > Christoph
> >
>

AW: loss of TaskManager

Posted by "Boden, Christoph" <ch...@tu-berlin.de>.
Hi Ufuk,

thanks for the hint. Unfortunately  I cannot access the system log on the remote machine. But i re-ran the job with slightly increased memory.fraction (0.3 -> 0.4) and got an OutOfMemory Exception again:

cloud-25
Error: com.esotericsoftware.kryo.KryoException: java.io.IOException: Failed to serialize element. Serialized size (> 553259005 bytes) exceeds JVM heap space
Serialization trace:
data (org.apache.flink.ml.math.SparseVector)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:501)
at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:630)
at com.esotericsoftware.kryo.io.Output.writeDoubles(Output.java:711)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:198)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:187)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:95)
at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:90)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:30)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:310)
at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 553259005 bytes) exceeds JVM heap space
at org.apache.flink.runtime.util.DataOutputSerializer.resize(DataOutputSerializer.java:288)
at org.apache.flink.runtime.util.DataOutputSerializer.write(DataOutputSerializer.java:117)
at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
... 36 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.runtime.util.DataOutputSerializer.resize(DataOutputSerializer.java:284)
at org.apache.flink.runtime.util.DataOutputSerializer.write(DataOutputSerializer.java:117)
at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:501)
at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:630)
at com.esotericsoftware.kryo.io.Output.writeDoubles(Output.java:711)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:198)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:187)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:95)
at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:90)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:30)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
cloud-22.
Error: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'cloud-25'.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:239)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:162)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException: 
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
... 2 more

Is there anything else that can be done to alleviate this problem?

Best and thanks,
Christoph 
________________________________________
Von: Ufuk Celebi <uc...@apache.org>
Gesendet: Donnerstag, 25. Februar 2016 12:35
An: user@flink.apache.org
Betreff: Re: loss of TaskManager

Hey Chris!

I think that the full amount of memory to Flink leads to the TM
process being killed by the OS. Can you check the OS logs whether the
OOM killer shut it down? You should be able to see this in the system
logs.

– Ufuk


On Thu, Feb 25, 2016 at 11:24 AM, Boden, Christoph
<ch...@tu-berlin.de> wrote:
> Dear Flink Community,
>
> I am trying to fit a support vector machine classifier using the CoCoA implementation provided in flink/ml/classification/ on a data set of moderate size (400k data points, 2000 features, approx. 12GB) on a cluster of 25 nodes with 28 GB memory each - and each worker node is awarded the full 28GB in  taskmanager.heap.mb.
>
> With the standard configuration I constantly run into different versions of JVM HeapSpace OutOfMemory Errors. (e.g. com.esotericsoftware.kryo.KryoException: java.io.IOException: Failed to serialize element. Serialized size (> 276647402 bytes) exceeds JVM heap space - Serialization trace: data (org.apache.flink.ml.math.SparseVector) ... ")
>
> As changing DOP did not alter anything, I significantly reduced the taskmanager.memory.fraction. With this I now (reproducibly) run into the following problem.
>
> After running for a while, the job fails with the following error:
>
> java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager  @ host slots - URL: akka.tcp://flink@url
> 2/user/taskmanager
>         at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
>         at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
>         at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
>         at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
>         at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
>         at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
>         at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>         at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> However the log of the taskmanager in question does not show any error or exception in its log. The last log entry is:
>
> 2016-02-25 09:38:12,543 INFO  org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask  - finishing iteration [2]:  Combine (Reduce at org.apache.flink.ml.classification.SVM$$anon$25$$anonfun$6.apply(SVM.scala:392)) (3/91)
>
> I am somewhat puzzled what could be the cause of this. Any help, or pointers to appropriate documentation would be greatly appreciated.
>
> I'll try increasing the heartbeat intervals next, but would still like to understand what goes wrong here.
>
> Best regards,
> Christoph
>

Re: loss of TaskManager

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Chris!

I think that the full amount of memory to Flink leads to the TM
process being killed by the OS. Can you check the OS logs whether the
OOM killer shut it down? You should be able to see this in the system
logs.

– Ufuk


On Thu, Feb 25, 2016 at 11:24 AM, Boden, Christoph
<ch...@tu-berlin.de> wrote:
> Dear Flink Community,
>
> I am trying to fit a support vector machine classifier using the CoCoA implementation provided in flink/ml/classification/ on a data set of moderate size (400k data points, 2000 features, approx. 12GB) on a cluster of 25 nodes with 28 GB memory each - and each worker node is awarded the full 28GB in  taskmanager.heap.mb.
>
> With the standard configuration I constantly run into different versions of JVM HeapSpace OutOfMemory Errors. (e.g. com.esotericsoftware.kryo.KryoException: java.io.IOException: Failed to serialize element. Serialized size (> 276647402 bytes) exceeds JVM heap space - Serialization trace: data (org.apache.flink.ml.math.SparseVector) ... ")
>
> As changing DOP did not alter anything, I significantly reduced the taskmanager.memory.fraction. With this I now (reproducibly) run into the following problem.
>
> After running for a while, the job fails with the following error:
>
> java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager  @ host slots - URL: akka.tcp://flink@url
> 2/user/taskmanager
>         at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
>         at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
>         at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
>         at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
>         at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
>         at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
>         at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>         at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> However the log of the taskmanager in question does not show any error or exception in its log. The last log entry is:
>
> 2016-02-25 09:38:12,543 INFO  org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask  - finishing iteration [2]:  Combine (Reduce at org.apache.flink.ml.classification.SVM$$anon$25$$anonfun$6.apply(SVM.scala:392)) (3/91)
>
> I am somewhat puzzled what could be the cause of this. Any help, or pointers to appropriate documentation would be greatly appreciated.
>
> I'll try increasing the heartbeat intervals next, but would still like to understand what goes wrong here.
>
> Best regards,
> Christoph
>