You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gerard Garcia <ge...@talaia.io> on 2018/07/02 10:29:37 UTC

Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces
belong to the unresponsive task, that is why we suspect that at some point
it did not have enough memory to serialize the message and it blocked. I've
also found that when it hanged several output buffers were full (see
attached image buffers.outPoolUsage.png) so I guess the traces just reflect
that.

Probably the task hanged for some other reason and that is what filled the
output buffers previous to the blocked operator. I'll have to continue
investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
wangzhijiang999@aliyun.com> wrote:

>  Hi Gerard,
>
>     From the below stack, it can only indicate the task is canceled that
> may be triggered by job manager becuase of other task failure. If the task
> can not be interrupted within timeout config, the task managerprocess will
> be exited. Do you see any OutOfMemory messages from the task manager log?
> Normally the ouput serialization buffer is managed by task manager
> framework and will not cause OOM, and on the input desearialization side,
> there will be a temp bytes array on each channel for holding partial
> records which is not managed by framework. I think you can confirm whether
> and where caused the OOM. Maybe check the task failure logs.
>
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:gerardg <ge...@talaia.io>
> 发送时间:2018年6月30日(星期六) 00:12
> 收件人:user <us...@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> (fixed formatting)
>
> Hello,
>
> We have experienced some problems where a task just hangs without showing
> any kind of log error while other tasks running in the same task manager
> continue without problems. When these tasks are restarted the task manager
> gets killed and shows several errors similar to these ones:
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
> java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> java.lang.Thread.run(Thread.java:748)
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> java.lang.Thread.run(Thread.java:748)
>
> Our task bundles several thousand of messages together so it creates some
> big single messages which could explain why the operator hangs trying to
> serialize the message. Our problem is that when a task hangs is very
> difficult to detect and we have to manually cancel and restart it.
>
> Is there any way to make the task manager fail or to increase the memory
> required by the allocation?
>
> Thanks, Gerard
> ------------------------------
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>
>
>

Re: Flink job hangs/deadlocks (possibly related to out of memory)

Posted by Gerard Garcia <ge...@talaia.io>.
Thanks Zhijiang,

Yes, I guess our best option right now is to just reduce the structure of
the output record and see if that solves the problem.

Gerard

On Tue, Jul 17, 2018 at 4:56 PM Zhijiang(wangzhijiang999) <
wangzhijiang999@aliyun.com> wrote:

> Hi Gerard,
>
> From the jstack you provided, the task is serializing the output record
> and during this process it will not process the input data any more.
> It can not indicate out of memory issue from this stack. And if the output
> buffer is exhausted, the task will be blocked on requestBufferBlocking
> process.
>
> I think the key point is your output record is too large and complicated
> structure, because every field and collection in this complicated class
> will be traversed to serialize, then it will cost much time and CPU usage.
> Furthermore, the checkpoint can not be done because of waiting for lock
> which is also occupied by task output process.
>
> As you mentioned, it makes sense to check the data structure of the output
> record and reduces the size or make it lightweight to handle.
>
> Best,
>
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <ge...@talaia.io>
> 发送时间:2018年7月17日(星期二) 21:53
> 收件人:piotr <pi...@data-artisans.com>
> 抄 送:fhueske <fh...@gmail.com>; wangzhijiang999 <
> wangzhijiang999@aliyun.com>; user <us...@flink.apache.org>; nico <
> nico@data-artisans.com>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record
> (probably too big, we have already started working to reduce its size)
> which consists of several case classes which have (among others) fields of
> type String.
>
> I attach a CPU profile of the thread stuck serializing. I also attach the
> memory and GC telemetry that the profiler shows (which maybe is more
> informative than the one recorded from the JVM metrics). Only one node was
> actually "doing something" all others had CPU usage near zero.
>
> The task is at the same time trying to perform a checkpoint but keeps
> failing. Would it make sense that the problem is that there is not enough
> memory available to perform the checkpoint so all operators are stuck
> waiting for it to finish, and at the same time, the operator stuck
> serializing is keeping all the memory so neither it nor the checkpoint can
> advance?
>
> I realized that I don't have a minimum pause between checkpoints so it is
> continuously trying. Maybe I can reduce the checkpoint timeout from the 10m
> default and introduce a minimum pause (e.g. 5m timeout and 5m minimum
> pause) and this way I could break the deadlock.
>
> Gerard
>
>
> On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
> Hi,
>
> Thanks for the additional data. Just to make sure, are you using Flink
> 1.5.0?
>
> There are a couple of threads that seams to be looping in serialisation,
> while others are blocked and either waiting for new data or waiting for
> some one to consume some data. Could you debug or CPU profile the code, in
> particularly focusing on threads with stack trace as below [1]. Aren’t you
> trying to serialise some gigantic String?
>
> Piotrek
>
> [1]:
>
> "(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x00007f52584d2800 nid=0x6819
> runnable [0x00007f451a843000]
>    java.lang.Thread.State: RUNNABLE
> at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
> at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at (...)
> at (...)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> - locked <0x00007f4b5488f2b8> (a java.lang.Object)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
>
> On 16 Jul 2018, at 17:03, Gerard Garcia <ge...@talaia.io> wrote:
>
> Hi Piotr,
>
> I attach the GC pauses logged a while back when the task stopped
> processing during several hours (it stopped at about 20:05) and a jstack
> dump from the last time the task hanged.
>
> Thanks,
>
> Gerard
>
> On Mon, Jul 16, 2018 at 4:12 PM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
> Hi Gerard,
>
> I second to what Zhijiang wrote. Please check GC pauses, either via GC
> logging, 3rd party tool like jconsole (or some memory profiler) or via
> enabling resource logging in Flink.
>
> After confirming that this is not the issue next time this happens,
> instead of cancelling the job, please collect thread dumps on a process
> that is stuck.
>
> Piotrek
>
> On 16 Jul 2018, at 13:53, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Gerard,
>
> Thanks for reporting this issue. I'm pulling in Nico and Piotr who have
> been working on the networking stack lately and might have some ideas
> regarding your issue.
>
> Best, Fabian
>
> 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com>:
> Hi Gerard,
>
> I thought the failed task triggers cancel process before, now I am clear
> that you cancel the task when it stops processing data.
> I think you can jstack the process to find where task thread is blocked
> instead of canceling it, then we may find some hints.
>
> In addition, the following stack "DataOutputSerializer.resize" indicates
> the task is serializing the record and there will be overhead byte buffers
> in the serializer for copying data temporarily. And if your record is too
> large, it may cause OOM in this process and this overhead memory is not
> managed by flink framework. Also you can monitor the gc status to check the
> full gc delay.
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <ge...@talaia.io>
> 发送时间:2018年7月13日(星期五) 16:22
> 收件人:wangzhijiang999 <wa...@aliyun.com>
> 抄 送:user <us...@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Hi Zhijiang,
>
> The problem is that no other task failed first. We have a task that
> sometimes just stops processing data, and when we cancel it, we see the
> logs messages  saying:
>
> " Task (...) did not react to cancelling signal for 30 seconds, but is
> stuck in method:
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
> org.apache.flink.types.StringValue.writeString(StringValue.java:802)
> (...)"
>
> That is why we suspect that it hangs forever at that point and that is why
> it stops processing data. I don;t see any increase in memory use in the
> heap (I guess because these buffers are managed by Flink) so I'm not sure
> if that is really the problem.
>
> Gerard
>
> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com> wrote:
> Hi Gerard,
>
> I think you can check the job manager log to find which task failed at
> first, and then trace the task manager log containing the failed task to
> find the initial reason.
> The failed task will trigger canceling all the other tasks, and during
> canceling process, the blocked task that is waiting for output buffer can
> not be interrupted by the
> canceler thread which is shown in your description. So I think the cancel
> process is not the key point and is in expectation. Maybe it did not cause
> OOM at all.
> If the taskduring canceling, the task manager process will be exited
> finally to trigger restarting the job.
>
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <ge...@talaia.io>
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999 <wa...@aliyun.com>
> 抄 送:user <us...@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Thanks Zhijiang,
>
> We haven't found any other relevant log messages anywhere. These traces
> belong to the unresponsive task, that is why we suspect that at some point
> it did not have enough memory to serialize the message and it blocked. I've
> also found that when it hanged several output buffers were full (see
> attached image buffers.outPoolUsage.png) so I guess the traces just reflect
> that.
>
> Probably the task hanged for some other reason and that is what filled the
> output buffers previous to the blocked operator. I'll have to continue
> investigating to find the real cause.
>
> Gerard
>
>
>
>
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com> wrote:
>  Hi Gerard,
>
>     From the below stack, it can only indicate the task is canceled that
> may be triggered by job manager becuase of other task failure. If the task
> can not be interrupted within timeout config, the task managerprocess will
> be exited. Do you see any OutOfMemory messages from the task manager log?
> Normally the ouput serialization buffer is managed by task manager
> framework and will not cause OOM, and on the input desearialization side,
> there will be a temp bytes array on each channel for holding partial
> records which is not managed by framework. I think you can confirm whether
> and where caused the OOM. Maybe check the task failure logs.
>
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:gerardg <ge...@talaia.io>
> 发送时间:2018年6月30日(星期六) 00:12
> 收件人:user <us...@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> (fixed formatting)
>
> Hello,
>
> We have experienced some problems where a task just hangs without showing
> any kind of log error while other tasks running in the same task manager
> continue without problems. When these tasks are restarted the task manager
> gets killed and shows several errors similar to these ones:
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
> java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> java.lang.Thread.run(Thread.java:748)
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> java.lang.Thread.run(Thread.java:748)
>
> Our task bundles several thousand of messages together so it creates some
> big single messages which could explain why the operator hangs trying to
> serialize the message. Our problem is that when a task hangs is very
> difficult to detect and we have to manually cancel and restart it.
>
> Is there any way to make the task manager fail or to increase the memory
> required by the allocation?
>
> Thanks, Gerard
> ------------------------------
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com <http://nabble.com/>.
>
>
>
>
>
> <Heap size.png><G1 Young|Old generation time.png><hang_jstack>
>
>
>

回复:Flink job hangs/deadlocks (possibly related to out of memory)

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com>.
Hi Gerard,

From the jstack you provided, the task is serializing the output record and during this process it will not process the input data any more. 
It can not indicate out of memory issue from this stack. And if the output buffer is exhausted, the task will be blocked on requestBufferBlocking process.

I think the key point is your output record is too large and complicated structure, because every field and collection in this complicated class will be traversed to serialize, then it will cost much time and CPU usage. Furthermore, the checkpoint can not be done because of waiting for lock which is also occupied by task output process.

As you mentioned, it makes sense to check the data structure of the output record and reduces the size or make it lightweight to handle. 

Best,

Zhijiang


------------------------------------------------------------------
发件人:Gerard Garcia <ge...@talaia.io>
发送时间:2018年7月17日(星期二) 21:53
收件人:piotr <pi...@data-artisans.com>
抄 送:fhueske <fh...@gmail.com>; wangzhijiang999 <wa...@aliyun.com>; user <us...@flink.apache.org>; nico <ni...@data-artisans.com>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record (probably too big, we have already started working to reduce its size) which consists of several case classes which have (among others) fields of type String. 

I attach a CPU profile of the thread stuck serializing. I also attach the memory and GC telemetry that the profiler shows (which maybe is more informative than the one recorded from the JVM metrics). Only one node was actually "doing something" all others had CPU usage near zero.

The task is at the same time trying to perform a checkpoint but keeps failing. Would it make sense that the problem is that there is not enough memory available to perform the checkpoint so all operators are stuck waiting for it to finish, and at the same time, the operator stuck serializing is keeping all the memory so neither it nor the checkpoint can advance? 

I realized that I don't have a minimum pause between checkpoints so it is continuously trying. Maybe I can reduce the checkpoint timeout from the 10m default and introduce a minimum pause (e.g. 5m timeout and 5m minimum pause) and this way I could break the deadlock.

Gerard


On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski <pi...@data-artisans.com> wrote:
Hi,

Thanks for the additional data. Just to make sure, are you using Flink 1.5.0?

There are a couple of threads that seams to be looping in serialisation, while others are blocked and either waiting for new data or waiting for some one to consume some data. Could you debug or CPU profile the code, in particularly focusing on threads with stack trace as below [1]. Aren’t you trying to serialise some gigantic String?

Piotrek

[1]:

"(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x00007f52584d2800 nid=0x6819 runnable [0x00007f451a843000]
   java.lang.Thread.State: RUNNABLE
 at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
 at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
 at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
 at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
 at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
 at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
 at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
 at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
 at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 at (...)
 at (...)
 at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
 at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 - locked <0x00007f4b5488f2b8> (a java.lang.Object)
 at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
 at java.lang.Thread.run(Thread.java:748)

On 16 Jul 2018, at 17:03, Gerard Garcia <ge...@talaia.io> wrote:
Hi Piotr,

I attach the GC pauses logged a while back when the task stopped processing during several hours (it stopped at about 20:05) and a jstack dump from the last time the task hanged. 

Thanks,

Gerard
On Mon, Jul 16, 2018 at 4:12 PM Piotr Nowojski <pi...@data-artisans.com> wrote:
Hi Gerard,

I second to what Zhijiang wrote. Please check GC pauses, either via GC logging, 3rd party tool like jconsole (or some memory profiler) or via enabling resource logging in Flink. 

After confirming that this is not the issue next time this happens, instead of cancelling the job, please collect thread dumps on a process that is stuck.

Piotrek  

On 16 Jul 2018, at 13:53, Fabian Hueske <fh...@gmail.com> wrote:
Hi Gerard,

Thanks for reporting this issue. I'm pulling in Nico and Piotr who have been working on the networking stack lately and might have some ideas regarding your issue.

Best, Fabian

2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) <wa...@aliyun.com>:
Hi Gerard,

I thought the failed task triggers cancel process before, now I am clear that you cancel the task when it stops processing data.
I think you can jstack the process to find where task thread is blocked instead of canceling it, then we may find some hints.

In addition, the following stack "DataOutputSerializer.resize" indicates the task is serializing the record and there will be overhead byte buffers in the serializer for copying data temporarily. And if your record is too large, it may cause OOM in this process and this overhead memory is not managed by flink framework. Also you can monitor the gc status to check the full gc delay.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Gerard Garcia <ge...@talaia.io>
发送时间:2018年7月13日(星期五) 16:22
收件人:wangzhijiang999 <wa...@aliyun.com>
抄 送:user <us...@flink.apache.org>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Hi Zhijiang,

The problem is that no other task failed first. We have a task that sometimes just stops processing data, and when we cancel it, we see the logs messages  saying:

" Task (...) did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133) org.apache.flink.types.StringValue.writeString(StringValue.java:802)
(...)"

That is why we suspect that it hangs forever at that point and that is why it stops processing data. I don;t see any increase in memory use in the heap (I guess because these buffers are managed by Flink) so I'm not sure if that is really the problem.

Gerard
On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
Hi Gerard,

I think you can check the job manager log to find which task failed at first, and then trace the task manager log containing the failed task to find the initial reason.
The failed task will trigger canceling all the other tasks, and during canceling process, the blocked task that is waiting for output buffer can not be interrupted by the
canceler thread which is shown in your description. So I think the cancel process is not the key point and is in expectation. Maybe it did not cause OOM at all. 
If the taskduring canceling, the task manager process will be exited finally to trigger restarting the job.

Zhijiang
------------------------------------------------------------------
发件人:Gerard Garcia <ge...@talaia.io>
发送时间:2018年7月2日(星期一) 18:29
收件人:wangzhijiang999 <wa...@aliyun.com>
抄 送:user <us...@flink.apache.org>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces belong to the unresponsive task, that is why we suspect that at some point it did not have enough memory to serialize the message and it blocked. I've also found that when it hanged several output buffers were full (see attached image buffers.outPoolUsage.png) so I guess the traces just reflect that.

Probably the task hanged for some other reason and that is what filled the output buffers previous to the blocked operator. I'll have to continue investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
 Hi Gerard,

    From the below stack, it can only indicate the task is canceled that may be triggered by job manager becuase of other task failure. If the task can not be interrupted within timeout config, the task managerprocess will be exited. Do you see any OutOfMemory messages from the task manager log?  Normally the ouput serialization buffer is managed by task manager framework and will not cause OOM, and on the input desearialization side, there will be a temp bytes array on each channel for holding partial records which is not managed by framework. I think you can confirm whether and where caused the OOM. Maybe check the task failure logs.

Zhijiang

------------------------------------------------------------------
发件人:gerardg <ge...@talaia.io>
发送时间:2018年6月30日(星期六) 00:12
收件人:user <us...@flink.apache.org>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any kind of log error while other tasks running in the same task manager continue without problems. When these tasks are restarted the task manager gets killed and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) java.nio.ByteBuffer.wrap(ByteBuffer.java:396) org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) 

 Our task bundles several thousand of messages together so it creates some big single messages which could explain why the operator hangs trying to serialize the message. Our problem is that when a task hangs is very difficult to detect and we have to manually cancel and restart it. 

 Is there any way to make the task manager fail or to increase the memory required by the allocation? 

 Thanks, Gerard 
 Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.





<Heap size.png><G1 Young|Old generation time.png><hang_jstack>


Re: Flink job hangs/deadlocks (possibly related to out of memory)

Posted by Gerard Garcia <ge...@talaia.io>.
Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record
(probably too big, we have already started working to reduce its size)
which consists of several case classes which have (among others) fields of
type String.

I attach a CPU profile of the thread stuck serializing. I also attach the
memory and GC telemetry that the profiler shows (which maybe is more
informative than the one recorded from the JVM metrics). Only one node was
actually "doing something" all others had CPU usage near zero.

The task is at the same time trying to perform a checkpoint but keeps
failing. Would it make sense that the problem is that there is not enough
memory available to perform the checkpoint so all operators are stuck
waiting for it to finish, and at the same time, the operator stuck
serializing is keeping all the memory so neither it nor the checkpoint can
advance?

I realized that I don't have a minimum pause between checkpoints so it is
continuously trying. Maybe I can reduce the checkpoint timeout from the 10m
default and introduce a minimum pause (e.g. 5m timeout and 5m minimum
pause) and this way I could break the deadlock.

Gerard


On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Thanks for the additional data. Just to make sure, are you using Flink
> 1.5.0?
>
> There are a couple of threads that seams to be looping in serialisation,
> while others are blocked and either waiting for new data or waiting for
> some one to consume some data. Could you debug or CPU profile the code, in
> particularly focusing on threads with stack trace as below [1]. Aren’t you
> trying to serialise some gigantic String?
>
> Piotrek
>
> [1]:
>
> "(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x00007f52584d2800 nid=0x6819
> runnable [0x00007f451a843000]
>    java.lang.Thread.State: RUNNABLE
> at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
> at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at (...)
> at (...)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> - locked <0x00007f4b5488f2b8> (a java.lang.Object)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
>
> On 16 Jul 2018, at 17:03, Gerard Garcia <ge...@talaia.io> wrote:
>
> Hi Piotr,
>
> I attach the GC pauses logged a while back when the task stopped
> processing during several hours (it stopped at about 20:05) and a jstack
> dump from the last time the task hanged.
>
> Thanks,
>
> Gerard
>
> On Mon, Jul 16, 2018 at 4:12 PM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hi Gerard,
>>
>> I second to what Zhijiang wrote. Please check GC pauses, either via GC
>> logging, 3rd party tool like jconsole (or some memory profiler) or via
>> enabling resource logging in Flink.
>>
>> After confirming that this is not the issue next time this happens,
>> instead of cancelling the job, please collect thread dumps on a process
>> that is stuck.
>>
>> Piotrek
>>
>> On 16 Jul 2018, at 13:53, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi Gerard,
>>
>> Thanks for reporting this issue. I'm pulling in Nico and Piotr who have
>> been working on the networking stack lately and might have some ideas
>> regarding your issue.
>>
>> Best, Fabian
>>
>> 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) <
>> wangzhijiang999@aliyun.com>:
>>
>>> Hi Gerard,
>>>
>>> I thought the failed task triggers cancel process before, now I am clear
>>> that you cancel the task when it stops processing data.
>>> I think you can jstack the process to find where task thread is blocked
>>> instead of canceling it, then we may find some hints.
>>>
>>> In addition, the following stack "DataOutputSerializer.resize" indicates
>>> the task is serializing the record and there will be overhead byte buffers
>>> in the serializer for copying data temporarily. And if your record is too
>>> large, it may cause OOM in this process and this overhead memory is not
>>> managed by flink framework. Also you can monitor the gc status to check the
>>> full gc delay.
>>>
>>> Best,
>>> Zhijiang
>>>
>>> ------------------------------------------------------------------
>>> 发件人:Gerard Garcia <ge...@talaia.io>
>>> 发送时间:2018年7月13日(星期五) 16:22
>>> 收件人:wangzhijiang999 <wa...@aliyun.com>
>>> 抄 送:user <us...@flink.apache.org>
>>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>>>
>>> Hi Zhijiang,
>>>
>>> The problem is that no other task failed first. We have a task that
>>> sometimes just stops processing data, and when we cancel it, we see the
>>> logs messages  saying:
>>>
>>> " Task (...) did not react to cancelling signal for 30 seconds, but is
>>> stuck in method:
>>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
>>> org.apache.flink.types.StringValue.writeString(StringValue.java:802)
>>> (...)"
>>>
>>> That is why we suspect that it hangs forever at that point and that is
>>> why it stops processing data. I don;t see any increase in memory use in the
>>> heap (I guess because these buffers are managed by Flink) so I'm not sure
>>> if that is really the problem.
>>>
>>> Gerard
>>>
>>> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <
>>> wangzhijiang999@aliyun.com> wrote:
>>> Hi Gerard,
>>>
>>> I think you can check the job manager log to find which task failed at
>>> first, and then trace the task manager log containing the failed task to
>>> find the initial reason.
>>> The failed task will trigger canceling all the other tasks, and during
>>> canceling process, the blocked task that is waiting for output buffer can
>>> not be interrupted by the
>>> canceler thread which is shown in your description. So I think the
>>> cancel process is not the key point and is in expectation. Maybe it did not
>>> cause OOM at all.
>>> If the taskduring canceling, the task manager process will be exited
>>> finally to trigger restarting the job.
>>>
>>> Zhijiang
>>> ------------------------------------------------------------------
>>> 发件人:Gerard Garcia <ge...@talaia.io>
>>> 发送时间:2018年7月2日(星期一) 18:29
>>> 收件人:wangzhijiang999 <wa...@aliyun.com>
>>> 抄 送:user <us...@flink.apache.org>
>>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>>>
>>> Thanks Zhijiang,
>>>
>>> We haven't found any other relevant log messages anywhere. These traces
>>> belong to the unresponsive task, that is why we suspect that at some point
>>> it did not have enough memory to serialize the message and it blocked. I've
>>> also found that when it hanged several output buffers were full (see
>>> attached image buffers.outPoolUsage.png) so I guess the traces just reflect
>>> that.
>>>
>>> Probably the task hanged for some other reason and that is what filled
>>> the output buffers previous to the blocked operator. I'll have to continue
>>> investigating to find the real cause.
>>>
>>> Gerard
>>>
>>>
>>>
>>>
>>> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
>>> wangzhijiang999@aliyun.com> wrote:
>>>  Hi Gerard,
>>>
>>>     From the below stack, it can only indicate the task is canceled that
>>> may be triggered by job manager becuase of other task failure. If the task
>>> can not be interrupted within timeout config, the task managerprocess will
>>> be exited. Do you see any OutOfMemory messages from the task manager log?
>>> Normally the ouput serialization buffer is managed by task manager
>>> framework and will not cause OOM, and on the input desearialization side,
>>> there will be a temp bytes array on each channel for holding partial
>>> records which is not managed by framework. I think you can confirm whether
>>> and where caused the OOM. Maybe check the task failure logs.
>>>
>>> Zhijiang
>>>
>>> ------------------------------------------------------------------
>>> 发件人:gerardg <ge...@talaia.io>
>>> 发送时间:2018年6月30日(星期六) 00:12
>>> 收件人:user <us...@flink.apache.org>
>>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>>>
>>> (fixed formatting)
>>>
>>> Hello,
>>>
>>> We have experienced some problems where a task just hangs without
>>> showing any kind of log error while other tasks running in the same task
>>> manager continue without problems. When these tasks are restarted the task
>>> manager gets killed and shows several errors similar to these ones:
>>>
>>> [Canceler/Interrupts for (...)' did not react to cancelling signal for
>>> 30 seconds, but is stuck in method:
>>> java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
>>> java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
>>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
>>> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
>>> scala.collection.immutable.List.foreach(List.scala:392)
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
>>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> (...)
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> (...)
>>> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>> java.lang.Thread.run(Thread.java:748)
>>>
>>> [Canceler/Interrupts for (...)' did not react to cancelling signal for
>>> 30 seconds, but is stuck in method:
>>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>>> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
>>> scala.collection.immutable.List.foreach(List.scala:392)
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
>>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> (...)
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> (...)
>>> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>> java.lang.Thread.run(Thread.java:748)
>>>
>>> Our task bundles several thousand of messages together so it creates
>>> some big single messages which could explain why the operator hangs trying
>>> to serialize the message. Our problem is that when a task hangs is very
>>> difficult to detect and we have to manually cancel and restart it.
>>>
>>> Is there any way to make the task manager fail or to increase the memory
>>> required by the allocation?
>>>
>>> Thanks, Gerard
>>> ------------------------------
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> at Nabble.com <http://nabble.com/>.
>>>
>>>
>>>
>>>
>>
>> <Heap size.png><G1 Young|Old generation time.png><hang_jstack>
>
>
>

Re: Flink job hangs/deadlocks (possibly related to out of memory)

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Thanks for the additional data. Just to make sure, are you using Flink 1.5.0?

There are a couple of threads that seams to be looping in serialisation, while others are blocked and either waiting for new data or waiting for some one to consume some data. Could you debug or CPU profile the code, in particularly focusing on threads with stack trace as below [1]. Aren’t you trying to serialise some gigantic String?

Piotrek

[1]:

"(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x00007f52584d2800 nid=0x6819 runnable [0x00007f451a843000]
   java.lang.Thread.State: RUNNABLE
	at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
	at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
	at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
	at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
	at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
	at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
	at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
	at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
	at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
	at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at (...)
	at (...)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	- locked <0x00007f4b5488f2b8> (a java.lang.Object)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)

> On 16 Jul 2018, at 17:03, Gerard Garcia <gerard@talaia.io <ma...@talaia.io>> wrote:
> 
> Hi Piotr,
> 
> I attach the GC pauses logged a while back when the task stopped processing during several hours (it stopped at about 20:05) and a jstack dump from the last time the task hanged. 
> 
> Thanks,
> 
> Gerard
> 
> On Mon, Jul 16, 2018 at 4:12 PM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi Gerard,
> 
> I second to what Zhijiang wrote. Please check GC pauses, either via GC logging, 3rd party tool like jconsole (or some memory profiler) or via enabling resource logging in Flink. 
> 
> After confirming that this is not the issue next time this happens, instead of cancelling the job, please collect thread dumps on a process that is stuck.
> 
> Piotrek  
> 
>> On 16 Jul 2018, at 13:53, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Gerard,
>> 
>> Thanks for reporting this issue. I'm pulling in Nico and Piotr who have been working on the networking stack lately and might have some ideas regarding your issue.
>> 
>> Best, Fabian
>> 
>> 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>>:
>> Hi Gerard,
>> 
>> I thought the failed task triggers cancel process before, now I am clear that you cancel the task when it stops processing data.
>> I think you can jstack the process to find where task thread is blocked instead of canceling it, then we may find some hints.
>> 
>> In addition, the following stack "DataOutputSerializer.resize" indicates the task is serializing the record and there will be overhead byte buffers in the serializer for copying data temporarily. And if your record is too large, it may cause OOM in this process and this overhead memory is not managed by flink framework. Also you can monitor the gc status to check the full gc delay.
>> 
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> 发件人:Gerard Garcia <gerard@talaia.io <ma...@talaia.io>>
>> 发送时间:2018年7月13日(星期五) 16:22
>> 收件人:wangzhijiang999 <wangzhijiang999@aliyun.com <ma...@aliyun.com>>
>> 抄 送:user <user@flink.apache.org <ma...@flink.apache.org>>
>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>> 
>> Hi Zhijiang,
>> 
>> The problem is that no other task failed first. We have a task that sometimes just stops processing data, and when we cancel it, we see the logs messages  saying:
>> 
>> " Task (...) did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133) org.apache.flink.types.StringValue.writeString(StringValue.java:802)
>> (...)"
>> 
>> That is why we suspect that it hangs forever at that point and that is why it stops processing data. I don;t see any increase in memory use in the heap (I guess because these buffers are managed by Flink) so I'm not sure if that is really the problem.
>> 
>> Gerard
>> 
>> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>> wrote:
>> Hi Gerard,
>> 
>> I think you can check the job manager log to find which task failed at first, and then trace the task manager log containing the failed task to find the initial reason.
>> The failed task will trigger canceling all the other tasks, and during canceling process, the blocked task that is waiting for output buffer can not be interrupted by the
>> canceler thread which is shown in your description. So I think the cancel process is not the key point and is in expectation. Maybe it did not cause OOM at all.
>> If the taskduring canceling, the task manager process will be exited finally to trigger restarting the job.
>> 
>> Zhijiang
>> ------------------------------------------------------------------
>> 发件人:Gerard Garcia <gerard@talaia.io <ma...@talaia.io>>
>> 发送时间:2018年7月2日(星期一) 18:29
>> 收件人:wangzhijiang999 <wangzhijiang999@aliyun.com <ma...@aliyun.com>>
>> 抄 送:user <user@flink.apache.org <ma...@flink.apache.org>>
>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>> 
>> Thanks Zhijiang,
>> 
>> We haven't found any other relevant log messages anywhere. These traces belong to the unresponsive task, that is why we suspect that at some point it did not have enough memory to serialize the message and it blocked. I've also found that when it hanged several output buffers were full (see attached image buffers.outPoolUsage.png) so I guess the traces just reflect that.
>> 
>> Probably the task hanged for some other reason and that is what filled the output buffers previous to the blocked operator. I'll have to continue investigating to find the real cause.
>> 
>> Gerard
>> 
>> 
>> 
>> 
>> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>> wrote:
>>  Hi Gerard,
>> 
>>     From the below stack, it can only indicate the task is canceled that may be triggered by job manager becuase of other task failure. If the task can not be interrupted within timeout config, the task managerprocess will be exited. Do you see any OutOfMemory messages from the task manager log?  Normally the ouput serialization buffer is managed by task manager framework and will not cause OOM, and on the input desearialization side, there will be a temp bytes array on each channel for holding partial records which is not managed by framework. I think you can confirm whether and where caused the OOM. Maybe check the task failure logs.
>> 
>> Zhijiang
>> 
>> ------------------------------------------------------------------
>> 发件人:gerardg <gerard@talaia.io <ma...@talaia.io>>
>> 发送时间:2018年6月30日(星期六) 00:12
>> 收件人:user <user@flink.apache.org <ma...@flink.apache.org>>
>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>> 
>> (fixed formatting) 
>> 
>> Hello, 
>> 
>> We have experienced some problems where a task just hangs without showing any kind of log error while other tasks running in the same task manager continue without problems. When these tasks are restarted the task manager gets killed and shows several errors similar to these ones: 
>> 
>> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) java.nio.ByteBuffer.wrap(ByteBuffer.java:396) org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) 
>> 
>> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) 
>> 
>> Our task bundles several thousand of messages together so it creates some big single messages which could explain why the operator hangs trying to serialize the message. Our problem is that when a task hangs is very difficult to detect and we have to manually cancel and restart it. 
>> 
>> Is there any way to make the task manager fail or to increase the memory required by the allocation? 
>> 
>> Thanks, Gerard 
>> Sent from the Apache Flink User Mailing List archive. mailing list archive <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at Nabble.com <http://nabble.com/>.
>> 
>> 
>> 
>> 
> 
> <Heap size.png><G1 Young|Old generation time.png><hang_jstack>


Re: Flink job hangs/deadlocks (possibly related to out of memory)

Posted by Gerard Garcia <ge...@talaia.io>.
Hi Piotr,

I attach the GC pauses logged a while back when the task stopped processing
during several hours (it stopped at about 20:05) and a jstack dump from the
last time the task hanged.

Thanks,

Gerard

On Mon, Jul 16, 2018 at 4:12 PM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi Gerard,
>
> I second to what Zhijiang wrote. Please check GC pauses, either via GC
> logging, 3rd party tool like jconsole (or some memory profiler) or via
> enabling resource logging in Flink.
>
> After confirming that this is not the issue next time this happens,
> instead of cancelling the job, please collect thread dumps on a process
> that is stuck.
>
> Piotrek
>
> On 16 Jul 2018, at 13:53, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Gerard,
>
> Thanks for reporting this issue. I'm pulling in Nico and Piotr who have
> been working on the networking stack lately and might have some ideas
> regarding your issue.
>
> Best, Fabian
>
> 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com>:
>
>> Hi Gerard,
>>
>> I thought the failed task triggers cancel process before, now I am clear
>> that you cancel the task when it stops processing data.
>> I think you can jstack the process to find where task thread is blocked
>> instead of canceling it, then we may find some hints.
>>
>> In addition, the following stack "DataOutputSerializer.resize" indicates
>> the task is serializing the record and there will be overhead byte buffers
>> in the serializer for copying data temporarily. And if your record is too
>> large, it may cause OOM in this process and this overhead memory is not
>> managed by flink framework. Also you can monitor the gc status to check the
>> full gc delay.
>>
>> Best,
>> Zhijiang
>>
>> ------------------------------------------------------------------
>> 发件人:Gerard Garcia <ge...@talaia.io>
>> 发送时间:2018年7月13日(星期五) 16:22
>> 收件人:wangzhijiang999 <wa...@aliyun.com>
>> 抄 送:user <us...@flink.apache.org>
>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>>
>> Hi Zhijiang,
>>
>> The problem is that no other task failed first. We have a task that
>> sometimes just stops processing data, and when we cancel it, we see the
>> logs messages  saying:
>>
>> " Task (...) did not react to cancelling signal for 30 seconds, but is
>> stuck in method:
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
>> org.apache.flink.types.StringValue.writeString(StringValue.java:802)
>> (...)"
>>
>> That is why we suspect that it hangs forever at that point and that is
>> why it stops processing data. I don;t see any increase in memory use in the
>> heap (I guess because these buffers are managed by Flink) so I'm not sure
>> if that is really the problem.
>>
>> Gerard
>>
>> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <
>> wangzhijiang999@aliyun.com> wrote:
>> Hi Gerard,
>>
>> I think you can check the job manager log to find which task failed at
>> first, and then trace the task manager log containing the failed task to
>> find the initial reason.
>> The failed task will trigger canceling all the other tasks, and during
>> canceling process, the blocked task that is waiting for output buffer can
>> not be interrupted by the
>> canceler thread which is shown in your description. So I think the
>> cancel process is not the key point and is in expectation. Maybe it did not
>> cause OOM at all.
>> If the taskduring canceling, the task manager process will be exited
>> finally to trigger restarting the job.
>>
>> Zhijiang
>> ------------------------------------------------------------------
>> 发件人:Gerard Garcia <ge...@talaia.io>
>> 发送时间:2018年7月2日(星期一) 18:29
>> 收件人:wangzhijiang999 <wa...@aliyun.com>
>> 抄 送:user <us...@flink.apache.org>
>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>>
>> Thanks Zhijiang,
>>
>> We haven't found any other relevant log messages anywhere. These traces
>> belong to the unresponsive task, that is why we suspect that at some point
>> it did not have enough memory to serialize the message and it blocked. I've
>> also found that when it hanged several output buffers were full (see
>> attached image buffers.outPoolUsage.png) so I guess the traces just reflect
>> that.
>>
>> Probably the task hanged for some other reason and that is what filled
>> the output buffers previous to the blocked operator. I'll have to continue
>> investigating to find the real cause.
>>
>> Gerard
>>
>>
>>
>>
>> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
>> wangzhijiang999@aliyun.com> wrote:
>>  Hi Gerard,
>>
>>     From the below stack, it can only indicate the task is canceled that
>> may be triggered by job manager becuase of other task failure. If the task
>> can not be interrupted within timeout config, the task managerprocess will
>> be exited. Do you see any OutOfMemory messages from the task manager log?
>> Normally the ouput serialization buffer is managed by task manager
>> framework and will not cause OOM, and on the input desearialization side,
>> there will be a temp bytes array on each channel for holding partial
>> records which is not managed by framework. I think you can confirm whether
>> and where caused the OOM. Maybe check the task failure logs.
>>
>> Zhijiang
>>
>> ------------------------------------------------------------------
>> 发件人:gerardg <ge...@talaia.io>
>> 发送时间:2018年6月30日(星期六) 00:12
>> 收件人:user <us...@flink.apache.org>
>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>>
>> (fixed formatting)
>>
>> Hello,
>>
>> We have experienced some problems where a task just hangs without showing
>> any kind of log error while other tasks running in the same task manager
>> continue without problems. When these tasks are restarted the task manager
>> gets killed and shows several errors similar to these ones:
>>
>> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
>> seconds, but is stuck in method:
>> java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
>> java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
>> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
>> scala.collection.immutable.List.foreach(List.scala:392)
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> (...)
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> (...)
>> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>> java.lang.Thread.run(Thread.java:748)
>>
>> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
>> seconds, but is stuck in method:
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
>> scala.collection.immutable.List.foreach(List.scala:392)
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> (...)
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> (...)
>> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>> java.lang.Thread.run(Thread.java:748)
>>
>> Our task bundles several thousand of messages together so it creates some
>> big single messages which could explain why the operator hangs trying to
>> serialize the message. Our problem is that when a task hangs is very
>> difficult to detect and we have to manually cancel and restart it.
>>
>> Is there any way to make the task manager fail or to increase the memory
>> required by the allocation?
>>
>> Thanks, Gerard
>> ------------------------------
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>>
>>
>>
>
>

Re: Flink job hangs/deadlocks (possibly related to out of memory)

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi Gerard,

I second to what Zhijiang wrote. Please check GC pauses, either via GC logging, 3rd party tool like jconsole (or some memory profiler) or via enabling resource logging in Flink. 

After confirming that this is not the issue next time this happens, instead of cancelling the job, please collect thread dumps on a process that is stuck.

Piotrek  

> On 16 Jul 2018, at 13:53, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Gerard,
> 
> Thanks for reporting this issue. I'm pulling in Nico and Piotr who have been working on the networking stack lately and might have some ideas regarding your issue.
> 
> Best, Fabian
> 
> 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>>:
> Hi Gerard,
> 
> I thought the failed task triggers cancel process before, now I am clear that you cancel the task when it stops processing data.
> I think you can jstack the process to find where task thread is blocked instead of canceling it, then we may find some hints.
> 
> In addition, the following stack "DataOutputSerializer.resize" indicates the task is serializing the record and there will be overhead byte buffers in the serializer for copying data temporarily. And if your record is too large, it may cause OOM in this process and this overhead memory is not managed by flink framework. Also you can monitor the gc status to check the full gc delay.
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <gerard@talaia.io <ma...@talaia.io>>
> 发送时间:2018年7月13日(星期五) 16:22
> 收件人:wangzhijiang999 <wangzhijiang999@aliyun.com <ma...@aliyun.com>>
> 抄 送:user <user@flink.apache.org <ma...@flink.apache.org>>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
> 
> Hi Zhijiang,
> 
> The problem is that no other task failed first. We have a task that sometimes just stops processing data, and when we cancel it, we see the logs messages  saying:
> 
> " Task (...) did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133) org.apache.flink.types.StringValue.writeString(StringValue.java:802)
> (...)"
> 
> That is why we suspect that it hangs forever at that point and that is why it stops processing data. I don;t see any increase in memory use in the heap (I guess because these buffers are managed by Flink) so I'm not sure if that is really the problem.
> 
> Gerard
> 
> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>> wrote:
> Hi Gerard,
> 
> I think you can check the job manager log to find which task failed at first, and then trace the task manager log containing the failed task to find the initial reason.
> The failed task will trigger canceling all the other tasks, and during canceling process, the blocked task that is waiting for output buffer can not be interrupted by the
> canceler thread which is shown in your description. So I think the cancel process is not the key point and is in expectation. Maybe it did not cause OOM at all.
> If the taskduring canceling, the task manager process will be exited finally to trigger restarting the job.
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <gerard@talaia.io <ma...@talaia.io>>
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999 <wangzhijiang999@aliyun.com <ma...@aliyun.com>>
> 抄 送:user <user@flink.apache.org <ma...@flink.apache.org>>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
> 
> Thanks Zhijiang,
> 
> We haven't found any other relevant log messages anywhere. These traces belong to the unresponsive task, that is why we suspect that at some point it did not have enough memory to serialize the message and it blocked. I've also found that when it hanged several output buffers were full (see attached image buffers.outPoolUsage.png) so I guess the traces just reflect that.
> 
> Probably the task hanged for some other reason and that is what filled the output buffers previous to the blocked operator. I'll have to continue investigating to find the real cause.
> 
> Gerard
> 
> 
> 
> 
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <wangzhijiang999@aliyun.com <ma...@aliyun.com>> wrote:
>  Hi Gerard,
> 
>     From the below stack, it can only indicate the task is canceled that may be triggered by job manager becuase of other task failure. If the task can not be interrupted within timeout config, the task managerprocess will be exited. Do you see any OutOfMemory messages from the task manager log?  Normally the ouput serialization buffer is managed by task manager framework and will not cause OOM, and on the input desearialization side, there will be a temp bytes array on each channel for holding partial records which is not managed by framework. I think you can confirm whether and where caused the OOM. Maybe check the task failure logs.
> 
> Zhijiang
> 
> ------------------------------------------------------------------
> 发件人:gerardg <gerard@talaia.io <ma...@talaia.io>>
> 发送时间:2018年6月30日(星期六) 00:12
> 收件人:user <user@flink.apache.org <ma...@flink.apache.org>>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
> 
> (fixed formatting) 
> 
> Hello, 
> 
> We have experienced some problems where a task just hangs without showing any kind of log error while other tasks running in the same task manager continue without problems. When these tasks are restarted the task manager gets killed and shows several errors similar to these ones: 
> 
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) java.nio.ByteBuffer.wrap(ByteBuffer.java:396) org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) 
> 
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) 
> 
> Our task bundles several thousand of messages together so it creates some big single messages which could explain why the operator hangs trying to serialize the message. Our problem is that when a task hangs is very difficult to detect and we have to manually cancel and restart it. 
> 
> Is there any way to make the task manager fail or to increase the memory required by the allocation? 
> 
> Thanks, Gerard 
> Sent from the Apache Flink User Mailing List archive. mailing list archive <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at Nabble.com.
> 
> 
> 
> 


Re: Flink job hangs/deadlocks (possibly related to out of memory)

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Gerard,

Thanks for reporting this issue. I'm pulling in Nico and Piotr who have
been working on the networking stack lately and might have some ideas
regarding your issue.

Best, Fabian

2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) <
wangzhijiang999@aliyun.com>:

> Hi Gerard,
>
> I thought the failed task triggers cancel process before, now I am clear
> that you cancel the task when it stops processing data.
> I think you can jstack the process to find where task thread is blocked
> instead of canceling it, then we may find some hints.
>
> In addition, the following stack "DataOutputSerializer.resize" indicates
> the task is serializing the record and there will be overhead byte buffers
> in the serializer for copying data temporarily. And if your record is too
> large, it may cause OOM in this process and this overhead memory is not
> managed by flink framework. Also you can monitor the gc status to check the
> full gc delay.
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <ge...@talaia.io>
> 发送时间:2018年7月13日(星期五) 16:22
> 收件人:wangzhijiang999 <wa...@aliyun.com>
> 抄 送:user <us...@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Hi Zhijiang,
>
> The problem is that no other task failed first. We have a task that
> sometimes just stops processing data, and when we cancel it, we see the
> logs messages  saying:
>
> " Task (...) did not react to cancelling signal for 30 seconds, but is
> stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
> org.apache.flink.types.StringValue.writeString(StringValue.java:802)
> (...)"
>
> That is why we suspect that it hangs forever at that point and that is why
> it stops processing data. I don;t see any increase in memory use in the
> heap (I guess because these buffers are managed by Flink) so I'm not sure
> if that is really the problem.
>
> Gerard
>
> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com> wrote:
> Hi Gerard,
>
> I think you can check the job manager log to find which task failed at
> first, and then trace the task manager log containing the failed task to
> find the initial reason.
> The failed task will trigger canceling all the other tasks, and during
> canceling process, the blocked task that is waiting for output buffer can
> not be interrupted by the
> canceler thread which is shown in your description. So I think the cancel
> process is not the key point and is in expectation. Maybe it did not cause
> OOM at all.
> If the taskduring canceling, the task manager process will be exited
> finally to trigger restarting the job.
>
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <ge...@talaia.io>
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999 <wa...@aliyun.com>
> 抄 送:user <us...@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Thanks Zhijiang,
>
> We haven't found any other relevant log messages anywhere. These traces
> belong to the unresponsive task, that is why we suspect that at some point
> it did not have enough memory to serialize the message and it blocked. I've
> also found that when it hanged several output buffers were full (see
> attached image buffers.outPoolUsage.png) so I guess the traces just
> reflect that.
>
> Probably the task hanged for some other reason and that is what filled the
> output buffers previous to the blocked operator. I'll have to continue
> investigating to find the real cause.
>
> Gerard
>
>
>
>
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com> wrote:
>  Hi Gerard,
>
>     From the below stack, it can only indicate the task is canceled that
> may be triggered by job manager becuase of other task failure. If the task
> can not be interrupted within timeout config, the task managerprocess will
> be exited. Do you see any OutOfMemory messages from the task manager log?
> Normally the ouput serialization buffer is managed by task manager
> framework and will not cause OOM, and on the input desearialization side,
> there will be a temp bytes array on each channel for holding partial
> records which is not managed by framework. I think you can confirm whether
> and where caused the OOM. Maybe check the task failure logs.
>
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:gerardg <ge...@talaia.io>
> 发送时间:2018年6月30日(星期六) 00:12
> 收件人:user <us...@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> (fixed formatting)
>
> Hello,
>
> We have experienced some problems where a task just hangs without showing
> any kind of log error while other tasks running in the same task manager
> continue without problems. When these tasks are restarted the task manager
> gets killed and shows several errors similar to these ones:
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
> java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.
> serialize(IntSerializer.java:63) org.apache.flink.api.common.
> typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.
> typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$
> anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$
> anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(
> TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.
> TraversableSerializer.serialize(TraversableSerializer.scala:33)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.
> typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.
> serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.
> runtime.streamrecord.StreamElementSerializer.serialize(
> StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.
> SerializationDelegate.write(SerializationDelegate.java:54)
> org.apache.flink.runtime.io.network.api.serialization.
> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:81) org.apache.flink.streaming.
> runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:89) org.apache.flink.streaming.
> runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...) org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:50) org.apache.flink.streaming.
> runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.
> runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...) org.apache.flink.streaming.api.scala.function.util.
> ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
> org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.
> runtime.operators.windowing.functions.InternalIterableProcessWindowF
> unction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.
> emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.
> runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> java.lang.Thread.run(Thread.java:748)
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method: org.apache.flink.core.memory.
> DataOutputSerializer.resize(DataOutputSerializer.java:305)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.
> serialize(IntSerializer.java:63) org.apache.flink.api.common.
> typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.
> typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.
> typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$
> anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$
> anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(
> TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.
> TraversableSerializer.serialize(TraversableSerializer.scala:33)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.
> typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.
> serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.
> runtime.streamrecord.StreamElementSerializer.serialize(
> StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.
> SerializationDelegate.write(SerializationDelegate.java:54)
> org.apache.flink.runtime.io.network.api.serialization.
> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> emit(StreamRecordWriter.java:81) org.apache.flink.streaming.
> runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> collect(RecordWriterOutput.java:89) org.apache.flink.streaming.
> runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...) org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:50) org.apache.flink.streaming.
> runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.
> runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...) org.apache.flink.streaming.api.scala.function.util.
> ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
> org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.
> runtime.operators.windowing.functions.InternalIterableProcessWindowF
> unction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.
> emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.
> runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> java.lang.Thread.run(Thread.java:748)
>
> Our task bundles several thousand of messages together so it creates some
> big single messages which could explain why the operator hangs trying to
> serialize the message. Our problem is that when a task hangs is very
> difficult to detect and we have to manually cancel and restart it.
>
> Is there any way to make the task manager fail or to increase the memory
> required by the allocation?
>
> Thanks, Gerard
> ------------------------------
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>
>
>
>

回复:Flink job hangs/deadlocks (possibly related to out of memory)

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com>.
Hi Gerard,

I thought the failed task triggers cancel process before, now I am clear that you cancel the task when it stops processing data.
I think you can jstack the process to find where task thread is blocked instead of canceling it, then we may find some hints.

In addition, the following stack "DataOutputSerializer.resize" indicates the task is serializing the record and there will be overhead byte buffers in the serializer for copying data temporarily. And if your record is too large, it may cause OOM in this process and this overhead memory is not managed by flink framework. Also you can monitor the gc status to check the full gc delay.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Gerard Garcia <ge...@talaia.io>
发送时间:2018年7月13日(星期五) 16:22
收件人:wangzhijiang999 <wa...@aliyun.com>
抄 送:user <us...@flink.apache.org>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Hi Zhijiang,

The problem is that no other task failed first. We have a task that sometimes just stops processing data, and when we cancel it, we see the logs messages  saying:

" Task (...) did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133) org.apache.flink.types.StringValue.writeString(StringValue.java:802)
(...)"

That is why we suspect that it hangs forever at that point and that is why it stops processing data. I don;t see any increase in memory use in the heap (I guess because these buffers are managed by Flink) so I'm not sure if that is really the problem.

Gerard
On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
Hi Gerard,

I think you can check the job manager log to find which task failed at first, and then trace the task manager log containing the failed task to find the initial reason.
The failed task will trigger canceling all the other tasks, and during canceling process, the blocked task that is waiting for output buffer can not be interrupted by the
canceler thread which is shown in your description. So I think the cancel process is not the key point and is in expectation. Maybe it did not cause OOM at all. 
If the taskduring canceling, the task manager process will be exited finally to trigger restarting the job.

Zhijiang
------------------------------------------------------------------
发件人:Gerard Garcia <ge...@talaia.io>
发送时间:2018年7月2日(星期一) 18:29
收件人:wangzhijiang999 <wa...@aliyun.com>
抄 送:user <us...@flink.apache.org>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces belong to the unresponsive task, that is why we suspect that at some point it did not have enough memory to serialize the message and it blocked. I've also found that when it hanged several output buffers were full (see attached image buffers.outPoolUsage.png) so I guess the traces just reflect that.

Probably the task hanged for some other reason and that is what filled the output buffers previous to the blocked operator. I'll have to continue investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
 Hi Gerard,

    From the below stack, it can only indicate the task is canceled that may be triggered by job manager becuase of other task failure. If the task can not be interrupted within timeout config, the task managerprocess will be exited. Do you see any OutOfMemory messages from the task manager log?  Normally the ouput serialization buffer is managed by task manager framework and will not cause OOM, and on the input desearialization side, there will be a temp bytes array on each channel for holding partial records which is not managed by framework. I think you can confirm whether and where caused the OOM. Maybe check the task failure logs.

Zhijiang

------------------------------------------------------------------
发件人:gerardg <ge...@talaia.io>
发送时间:2018年6月30日(星期六) 00:12
收件人:user <us...@flink.apache.org>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any kind of log error while other tasks running in the same task manager continue without problems. When these tasks are restarted the task manager gets killed and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) java.nio.ByteBuffer.wrap(ByteBuffer.java:396) org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) 

 Our task bundles several thousand of messages together so it creates some big single messages which could explain why the operator hangs trying to serialize the message. Our problem is that when a task hangs is very difficult to detect and we have to manually cancel and restart it. 

 Is there any way to make the task manager fail or to increase the memory required by the allocation? 

 Thanks, Gerard 
 Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




Re: Flink job hangs/deadlocks (possibly related to out of memory)

Posted by Gerard Garcia <ge...@talaia.io>.
Hi Zhijiang,

The problem is that no other task failed first. We have a task that
sometimes just stops processing data, and when we cancel it, we see the
logs messages  saying:

" Task (...) did not react to cancelling signal for 30 seconds, but is
stuck in method:
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
org.apache.flink.types.StringValue.writeString(StringValue.java:802)
(...)"

That is why we suspect that it hangs forever at that point and that is why
it stops processing data. I don;t see any increase in memory use in the
heap (I guess because these buffers are managed by Flink) so I'm not sure
if that is really the problem.

Gerard

On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <
wangzhijiang999@aliyun.com> wrote:

> Hi Gerard,
>
> I think you can check the job manager log to find which task failed at
> first, and then trace the task manager log containing the failed task to
> find the initial reason.
> The failed task will trigger canceling all the other tasks, and during
> canceling process, the blocked task that is waiting for output buffer can
> not be interrupted by the
> canceler thread which is shown in your description. So I think the cancel
> process is not the key point and is in expectation. Maybe it did not cause
> OOM at all.
> If the taskduring canceling, the task manager process will be exited
> finally to trigger restarting the job.
>
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:Gerard Garcia <ge...@talaia.io>
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999 <wa...@aliyun.com>
> 抄 送:user <us...@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Thanks Zhijiang,
>
> We haven't found any other relevant log messages anywhere. These traces
> belong to the unresponsive task, that is why we suspect that at some point
> it did not have enough memory to serialize the message and it blocked. I've
> also found that when it hanged several output buffers were full (see
> attached image buffers.outPoolUsage.png) so I guess the traces just reflect
> that.
>
> Probably the task hanged for some other reason and that is what filled the
> output buffers previous to the blocked operator. I'll have to continue
> investigating to find the real cause.
>
> Gerard
>
>
>
>
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com> wrote:
>  Hi Gerard,
>
>     From the below stack, it can only indicate the task is canceled that
> may be triggered by job manager becuase of other task failure. If the task
> can not be interrupted within timeout config, the task managerprocess will
> be exited. Do you see any OutOfMemory messages from the task manager log?
> Normally the ouput serialization buffer is managed by task manager
> framework and will not cause OOM, and on the input desearialization side,
> there will be a temp bytes array on each channel for holding partial
> records which is not managed by framework. I think you can confirm whether
> and where caused the OOM. Maybe check the task failure logs.
>
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:gerardg <ge...@talaia.io>
> 发送时间:2018年6月30日(星期六) 00:12
> 收件人:user <us...@flink.apache.org>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> (fixed formatting)
>
> Hello,
>
> We have experienced some problems where a task just hangs without showing
> any kind of log error while other tasks running in the same task manager
> continue without problems. When these tasks are restarted the task manager
> gets killed and shows several errors similar to these ones:
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
> java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> java.lang.Thread.run(Thread.java:748)
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> (...)
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> java.lang.Thread.run(Thread.java:748)
>
> Our task bundles several thousand of messages together so it creates some
> big single messages which could explain why the operator hangs trying to
> serialize the message. Our problem is that when a task hangs is very
> difficult to detect and we have to manually cancel and restart it.
>
> Is there any way to make the task manager fail or to increase the memory
> required by the allocation?
>
> Thanks, Gerard
> ------------------------------
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>
>
>

回复:Flink job hangs/deadlocks (possibly related to out of memory)

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com>.
Hi Gerard,

I think you can check the job manager log to find which task failed at first, and then trace the task manager log containing the failed task to find the initial reason.
The failed task will trigger canceling all the other tasks, and during canceling process, the blocked task that is waiting for output buffer can not be interrupted by the
canceler thread which is shown in your description. So I think the cancel process is not the key point and is in expectation. Maybe it did not cause OOM at all. 
If the taskduring canceling, the task manager process will be exited finally to trigger restarting the job.

Zhijiang
------------------------------------------------------------------
发件人:Gerard Garcia <ge...@talaia.io>
发送时间:2018年7月2日(星期一) 18:29
收件人:wangzhijiang999 <wa...@aliyun.com>
抄 送:user <us...@flink.apache.org>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces belong to the unresponsive task, that is why we suspect that at some point it did not have enough memory to serialize the message and it blocked. I've also found that when it hanged several output buffers were full (see attached image buffers.outPoolUsage.png) so I guess the traces just reflect that.

Probably the task hanged for some other reason and that is what filled the output buffers previous to the blocked operator. I'll have to continue investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
 Hi Gerard,

    From the below stack, it can only indicate the task is canceled that may be triggered by job manager becuase of other task failure. If the task can not be interrupted within timeout config, the task managerprocess will be exited. Do you see any OutOfMemory messages from the task manager log?  Normally the ouput serialization buffer is managed by task manager framework and will not cause OOM, and on the input desearialization side, there will be a temp bytes array on each channel for holding partial records which is not managed by framework. I think you can confirm whether and where caused the OOM. Maybe check the task failure logs.

Zhijiang

------------------------------------------------------------------
发件人:gerardg <ge...@talaia.io>
发送时间:2018年6月30日(星期六) 00:12
收件人:user <us...@flink.apache.org>
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any kind of log error while other tasks running in the same task manager continue without problems. When these tasks are restarted the task manager gets killed and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) java.nio.ByteBuffer.wrap(ByteBuffer.java:396) org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) 

 Our task bundles several thousand of messages together so it creates some big single messages which could explain why the operator hangs trying to serialize the message. Our problem is that when a task hangs is very difficult to detect and we have to manually cancel and restart it. 

 Is there any way to make the task manager fail or to increase the memory required by the allocation? 

 Thanks, Gerard 
 Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.