You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ori Popowski <or...@gmail.com> on 2020/07/01 12:10:49 UTC

Re: Heartbeat of TaskManager timed out.

I've found out that sometimes one of my TaskManagers experiences a GC pause
of 40-50 seconds and I have no idea why.
I profiled one of the machines using JProfiler and everything looks fine.
No memory leaks, memory is low.
However, I cannot anticipate which of the machines will get the 40-50
seconds pause and I also cannot profile all of them all the time.

Any suggestions?

On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <to...@gmail.com> wrote:

> In Flink 1.10, there's a huge change in the memory management compared to
> previous versions. This could be related to your observations, because with
> the same configurations, it is possible that there's less JVM heap space
> (with more off-heap memory). Please take a look at this migration guide [1].
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>
> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com> wrote:
>
>> Thanks for the suggestions!
>>
>> > i recently tried 1.10 and see this error frequently. and i dont have
>> the same issue when running with 1.9.1
>> I did downgrade to Flink 1.9 and there's certainly no change in the
>> occurrences in the heartbeat timeout
>>
>>
>> >
>>
>>    - Probably the most straightforward way is to try increasing the
>>    timeout to see if that helps. You can leverage the configuration option
>>    `heartbeat.timeout`[1]. The default is 50s.
>>    - It might be helpful to share your configuration setups (e.g., the
>>    TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>    share the beginning part of your JM/TM logs, including the JVM parameters
>>    and all the loaded configurations.
>>    - You may want to look into the GC logs in addition to the metrics.
>>    In case of a CMS GC stop-the-world, you may not be able to see the most
>>    recent metrics due to the process not responding to the metric querying
>>    services.
>>    - You may also look into the status of the JM process. If JM is under
>>    significant GC pressure, it could also happen that the heartbeat message
>>    from TM is not timely handled before the timeout check.
>>    - Is there any metrics monitoring the network condition between the
>>    JM and timeouted TM? Possibly any jitters?
>>
>>
>> Weirdly enough, I did manage to find a problem with the timed out
>> TaskManagers, which slipped away the last time I checked: The timed out
>> TaskManager is always the one with the max. GC time (young generation). I
>> see it only now that I run with G1GC, but with the previous GC it wasn't
>> the case.
>>
>> Does anyone know what can cause high GC time and how to mitigate this?
>>
>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> Hi Ori,
>>>
>>> Here are some suggestions from my side.
>>>
>>>    - Probably the most straightforward way is to try increasing the
>>>    timeout to see if that helps. You can leverage the configuration option
>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>    - It might be helpful to share your configuration setups (e.g., the
>>>    TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>>    share the beginning part of your JM/TM logs, including the JVM parameters
>>>    and all the loaded configurations.
>>>    - You may want to look into the GC logs in addition to the metrics.
>>>    In case of a CMS GC stop-the-world, you may not be able to see the most
>>>    recent metrics due to the process not responding to the metric querying
>>>    services.
>>>    - You may also look into the status of the JM process. If JM is
>>>    under significant GC pressure, it could also happen that the heartbeat
>>>    message from TM is not timely handled before the timeout check.
>>>    - Is there any metrics monitoring the network condition between the
>>>    JM and timeouted TM? Possibly any jitters?
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>
>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <or...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>> partitions and I have parallelism of 189.
>>>>
>>>> Currently running with RocksDB, with checkpointing disabled. My state
>>>> size is appx. 500gb.
>>>>
>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with
>>>> no apparent reason.
>>>>
>>>> I check the container that gets the timeout for GC pauses, heap memory,
>>>> direct memory, mapped memory, offheap memory, CPU load, network load, total
>>>> out-records, total in-records, backpressure, and everything I can think of.
>>>> But all those metrics show that there's nothing unusual, and it has around
>>>> average values for all those metrics. There are a lot of other containers
>>>> which score higher.
>>>>
>>>> All the metrics are very low because every TaskManager runs on a
>>>> r5.2xlarge machine alone.
>>>>
>>>> I'm trying to debug this for days and I cannot find any explanation for
>>>> it.
>>>>
>>>> Can someone explain why it's happening?
>>>>
>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
>>>> id container_1593074931633_0011_01_000127 timed out.
>>>>     at org.apache.flink.runtime.jobmaster.
>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster
>>>> .java:1147)
>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>> HeartbeatMonitorImpl.java:109)
>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>>>> .java:511)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>>>> AkkaRpcActor.java:397)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>>>> AkkaRpcActor.java:190)
>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>>> AkkaRpcActor.java:152)
>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:
>>>> 123)
>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:
>>>> 21)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>> 170)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>> 171)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>> 171)
>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260
>>>> )
>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>> ForkJoinPool.java:1339)
>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
>>>> 1979)
>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>> ForkJoinWorkerThread.java:107)
>>>>
>>>> Thanks
>>>>
>>>

Re: Heartbeat of TaskManager timed out.

Posted by Ori Popowski <or...@gmail.com>.
I wouldn't want to jump into conclusions, but from what I see, very large
lists and vectors do not work well with flatten in 2.11, each for its own
reasons.

In any case, it's 100% not a Flink issue.

On Tue, Jul 7, 2020 at 10:10 AM Xintong Song <to...@gmail.com> wrote:

> Thanks for the updates, Ori.
>
> I'm not familiar with Scala. Just curious, if what you suspect is true, is
> it a bug of Scala?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jul 7, 2020 at 1:41 PM Ori Popowski <or...@gmail.com> wrote:
>
>> Hi,
>>
>> I just wanted to update that the problem is now solved!
>>
>> I suspect that Scala's flatten() method has a memory problem on very
>> large lists (> 2 billion elements). When using Scala Lists, the memory
>> seems to leak but the app keeps running, and when using Scala Vectors, a
>> weird IllegalArgumentException is thrown [1].
>>
>> I implemented my own flatten() method using Arrays and quickly ran into
>> NegativeArraySizeException since the integer representing the array size
>> wrapped around at Integer.MaxValue and became negative. After I started
>> catching this exception all my cluster problems just resolved. Checkpoints,
>> the heartbeat timeout, and also the memory and CPU utilization.
>>
>> I still need to confirm my suspicion towards Scala's flatten() though,
>> since I haven't "lab-tested" it.
>>
>> [1] https://github.com/NetLogo/NetLogo/issues/1830
>>
>> On Sun, Jul 5, 2020 at 2:21 PM Ori Popowski <or...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I initially thought this, so this is why my heap is almost 30GiB.
>>> However, I started to analyze the Java Flight Recorder files, and I
>>> suspect there's a memory leak in Scala's flatten() method.
>>> I changed the line that uses flatten(), and instead of flatten() I'm
>>> just creating a ByteArray the size flatten() would have returned, and I
>>> no longer have the heartbeat problem.
>>>
>>> So now my code is
>>>     val recordingData = recordingBytes.flatten
>>>
>>> instead of
>>>     val recordingData =
>>> Array.fill[Byte](recordingBytes.map(_.length).sum)(0)
>>>
>>> I attach a screenshot of Java Mission Control
>>>
>>>
>>>
>>> On Fri, Jul 3, 2020 at 7:24 AM Xintong Song <to...@gmail.com>
>>> wrote:
>>>
>>>> I agree with Roman's suggestion for increasing heap size.
>>>>
>>>> It seems that the heap grows faster than freed. Thus eventually the
>>>> Full GC is triggered, taking more than 50s and causing the timeout.
>>>> However, even the full GC frees only 2GB space out of the 28GB max size.
>>>> That probably suggests that the max heap size is not sufficient.
>>>>
>>>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>>>>  28944M->26018M(28960M), 51.5256128 secs]
>>>>>     [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>>>> 113556K->112729K(1150976K)]
>>>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>>>
>>>>
>>>> I would not be so sure about the memory leak. I think it could be a
>>>> normal pattern that memory keeps growing as more data is processed. E.g.,
>>>> from the provided log, I see window operation tasks executed in the task
>>>> manager. Such operation might accumulate data until the window is emitted.
>>>>
>>>> Maybe Ori you can also take a look at the task manager log when the job
>>>> runs with Flink 1.9 without this problem, see how the heap size changed. As
>>>> I mentioned before, it is possible that, with the same configurations Flink
>>>> 1.10 has less heap size compared to Flink 1.9, due to the memory model
>>>> changes.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski <or...@gmail.com> wrote:
>>>>
>>>>> Thank you very much for your analysis.
>>>>>
>>>>> When I said there was no memory leak - I meant that from the specific
>>>>> TaskManager I monitored in real-time using JProfiler.
>>>>> Unfortunately, this problem occurs only in 1 of the TaskManager and
>>>>> you cannot anticipate which. So when you pick a TM to profile at random -
>>>>> everything looks fine.
>>>>>
>>>>> I'm running the job again with Java FlightRecorder now, and I hope
>>>>> I'll find the reason for the memory leak.
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
>>>>> khachatryan.roman@gmail.com> wrote:
>>>>>
>>>>>> Thanks, Ori
>>>>>>
>>>>>> From the log, it looks like there IS a memory leak.
>>>>>>
>>>>>> At 10:12:53 there was the last "successfull" gc when 13Gb freed in
>>>>>> 0.4653809 secs:
>>>>>> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
>>>>>> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]
>>>>>>
>>>>>> Then the heap grew from 10G to 28G with GC not being able to free up
>>>>>> enough space:
>>>>>> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M
>>>>>> Heap: 12591.0M(28960.0M)->11247.0M(28960.0M)]
>>>>>> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
>>>>>> 12103.0M(28960.0M)->11655.0M(28960.0M)]
>>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M
>>>>>> Heap: 12929.0M(28960.0M)->12467.0M(28960.0M)]
>>>>>> ... ...
>>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M
>>>>>> Heap: 28042.6M(28960.0M)->27220.6M(28960.0M)]
>>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M
>>>>>> Heap: 28494.5M(28960.0M)->28720.6M(28960.0M)]
>>>>>> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
>>>>>> 28944.6M(28960.0M)->28944.6M(28960.0M)]
>>>>>>
>>>>>> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
>>>>>> heartbeat timed out:
>>>>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>>>>>  28944M->26018M(28960M), 51.5256128 secs]
>>>>>>   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>>>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>>>>> 113556K->112729K(1150976K)]
>>>>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>>>>> 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort]
>>>>>> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
>>>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
>>>>>> JobManager with id bc59ba6a
>>>>>>
>>>>>> No substantial amount memory was freed after that.
>>>>>>
>>>>>> If this memory usage pattern is expected, I'd suggest to:
>>>>>> 1. increase heap size
>>>>>> 2. play with PrintStringDeduplicationStatistics and
>>>>>> UseStringDeduplication flags - probably string deduplication is making G1
>>>>>> slower then CMS
>>>>>>
>>>>>> Regards,
>>>>>> Roman
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <or...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'd be happy to :) Attached is a TaskManager log which timed out.
>>>>>>>
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <to...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Maybe you can share the log and gc-log of the problematic
>>>>>>>> TaskManager? See if we can find any clue.
>>>>>>>>
>>>>>>>> Thank you~
>>>>>>>>
>>>>>>>> Xintong Song
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <or...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I've found out that sometimes one of my TaskManagers experiences a
>>>>>>>>> GC pause of 40-50 seconds and I have no idea why.
>>>>>>>>> I profiled one of the machines using JProfiler and everything
>>>>>>>>> looks fine. No memory leaks, memory is low.
>>>>>>>>> However, I cannot anticipate which of the machines will get the
>>>>>>>>> 40-50 seconds pause and I also cannot profile all of them all the time.
>>>>>>>>>
>>>>>>>>> Any suggestions?
>>>>>>>>>
>>>>>>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <
>>>>>>>>> tonysong820@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> In Flink 1.10, there's a huge change in the memory management
>>>>>>>>>> compared to previous versions. This could be related to your observations,
>>>>>>>>>> because with the same configurations, it is possible that there's less JVM
>>>>>>>>>> heap space (with more off-heap memory). Please take a look at this
>>>>>>>>>> migration guide [1].
>>>>>>>>>>
>>>>>>>>>> Thank you~
>>>>>>>>>>
>>>>>>>>>> Xintong Song
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>>>>>>>>
>>>>>>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for the suggestions!
>>>>>>>>>>>
>>>>>>>>>>> > i recently tried 1.10 and see this error frequently. and i
>>>>>>>>>>> dont have the same issue when running with 1.9.1
>>>>>>>>>>> I did downgrade to Flink 1.9 and there's certainly no change in
>>>>>>>>>>> the occurrences in the heartbeat timeout
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>>    - Probably the most straightforward way is to try increasing
>>>>>>>>>>>    the timeout to see if that helps. You can leverage the configuration option
>>>>>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>>>>    - It might be helpful to share your configuration setups
>>>>>>>>>>>    (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe the easiest
>>>>>>>>>>>    way is to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>>>>>    querying services.
>>>>>>>>>>>    - You may also look into the status of the JM process. If JM
>>>>>>>>>>>    is under significant GC pressure, it could also happen that the heartbeat
>>>>>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>>>>>    - Is there any metrics monitoring the network condition
>>>>>>>>>>>    between the JM and timeouted TM? Possibly any jitters?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Weirdly enough, I did manage to find a problem with the timed
>>>>>>>>>>> out TaskManagers, which slipped away the last time I checked: The timed out
>>>>>>>>>>> TaskManager is always the one with the max. GC time (young generation). I
>>>>>>>>>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>>>>>>>>>> the case.
>>>>>>>>>>>
>>>>>>>>>>> Does anyone know what can cause high GC time and how to mitigate
>>>>>>>>>>> this?
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <
>>>>>>>>>>> tonysong820@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Ori,
>>>>>>>>>>>>
>>>>>>>>>>>> Here are some suggestions from my side.
>>>>>>>>>>>>
>>>>>>>>>>>>    - Probably the most straightforward way is to try
>>>>>>>>>>>>    increasing the timeout to see if that helps. You can leverage the
>>>>>>>>>>>>    configuration option `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>>>>>    - It might be helpful to share your configuration setups
>>>>>>>>>>>>    (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe the easiest
>>>>>>>>>>>>    way is to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>>>>>>    querying services.
>>>>>>>>>>>>    - You may also look into the status of the JM process. If
>>>>>>>>>>>>    JM is under significant GC pressure, it could also happen that the
>>>>>>>>>>>>    heartbeat message from TM is not timely handled before the timeout check.
>>>>>>>>>>>>    - Is there any metrics monitoring the network condition
>>>>>>>>>>>>    between the JM and timeouted TM? Possibly any jitters?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you~
>>>>>>>>>>>>
>>>>>>>>>>>> Xintong Song
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <
>>>>>>>>>>>> ori.psk@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>>>>>>>>> partitions and I have parallelism of 189.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Currently running with RocksDB, with checkpointing disabled.
>>>>>>>>>>>>> My state size is appx. 500gb.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out"
>>>>>>>>>>>>> errors with no apparent reason.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I check the container that gets the timeout for GC pauses,
>>>>>>>>>>>>> heap memory, direct memory, mapped memory, offheap memory, CPU load,
>>>>>>>>>>>>> network load, total out-records, total in-records, backpressure, and
>>>>>>>>>>>>> everything I can think of. But all those metrics show that there's nothing
>>>>>>>>>>>>> unusual, and it has around average values for all those metrics. There are
>>>>>>>>>>>>> a lot of other containers which score higher.
>>>>>>>>>>>>>
>>>>>>>>>>>>> All the metrics are very low because every TaskManager runs on
>>>>>>>>>>>>> a r5.2xlarge machine alone.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm trying to debug this for days and I cannot find any
>>>>>>>>>>>>> explanation for it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Can someone explain why it's happening?
>>>>>>>>>>>>>
>>>>>>>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of
>>>>>>>>>>>>> TaskManager with id container_1593074931633_0011_01_000127
>>>>>>>>>>>>> timed out.
>>>>>>>>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>>>>>>>>> JobMaster.java:1147)
>>>>>>>>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl
>>>>>>>>>>>>> .run(HeartbeatMonitorImpl.java:109)
>>>>>>>>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(
>>>>>>>>>>>>> Executors.java:511)
>>>>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266
>>>>>>>>>>>>> )
>>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>>>> .handleRunAsync(AkkaRpcActor.java:397)
>>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>>>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>>>> .handleMessage(AkkaRpcActor.java:152)
>>>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements
>>>>>>>>>>>>> .scala:26)
>>>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements
>>>>>>>>>>>>> .scala:21)
>>>>>>>>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction
>>>>>>>>>>>>> .scala:123)
>>>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(
>>>>>>>>>>>>> CaseStatements.scala:21)
>>>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(
>>>>>>>>>>>>> PartialFunction.scala:170)
>>>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(
>>>>>>>>>>>>> PartialFunction.scala:171)
>>>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(
>>>>>>>>>>>>> PartialFunction.scala:171)
>>>>>>>>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor
>>>>>>>>>>>>> .scala:225)
>>>>>>>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592
>>>>>>>>>>>>> )
>>>>>>>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask
>>>>>>>>>>>>> .java:260)
>>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>>>>>>>> ForkJoinPool.java:1339)
>>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(
>>>>>>>>>>>>> ForkJoinPool.java:1979)
>>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>>>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>

Re: Heartbeat of TaskManager timed out.

Posted by Xintong Song <to...@gmail.com>.
Thanks for the updates, Ori.

I'm not familiar with Scala. Just curious, if what you suspect is true, is
it a bug of Scala?

Thank you~

Xintong Song



On Tue, Jul 7, 2020 at 1:41 PM Ori Popowski <or...@gmail.com> wrote:

> Hi,
>
> I just wanted to update that the problem is now solved!
>
> I suspect that Scala's flatten() method has a memory problem on very
> large lists (> 2 billion elements). When using Scala Lists, the memory
> seems to leak but the app keeps running, and when using Scala Vectors, a
> weird IllegalArgumentException is thrown [1].
>
> I implemented my own flatten() method using Arrays and quickly ran into
> NegativeArraySizeException since the integer representing the array size
> wrapped around at Integer.MaxValue and became negative. After I started
> catching this exception all my cluster problems just resolved. Checkpoints,
> the heartbeat timeout, and also the memory and CPU utilization.
>
> I still need to confirm my suspicion towards Scala's flatten() though,
> since I haven't "lab-tested" it.
>
> [1] https://github.com/NetLogo/NetLogo/issues/1830
>
> On Sun, Jul 5, 2020 at 2:21 PM Ori Popowski <or...@gmail.com> wrote:
>
>> Hi,
>>
>> I initially thought this, so this is why my heap is almost 30GiB.
>> However, I started to analyze the Java Flight Recorder files, and I
>> suspect there's a memory leak in Scala's flatten() method.
>> I changed the line that uses flatten(), and instead of flatten() I'm
>> just creating a ByteArray the size flatten() would have returned, and I
>> no longer have the heartbeat problem.
>>
>> So now my code is
>>     val recordingData = recordingBytes.flatten
>>
>> instead of
>>     val recordingData =
>> Array.fill[Byte](recordingBytes.map(_.length).sum)(0)
>>
>> I attach a screenshot of Java Mission Control
>>
>>
>>
>> On Fri, Jul 3, 2020 at 7:24 AM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> I agree with Roman's suggestion for increasing heap size.
>>>
>>> It seems that the heap grows faster than freed. Thus eventually the Full
>>> GC is triggered, taking more than 50s and causing the timeout. However,
>>> even the full GC frees only 2GB space out of the 28GB max size. That
>>> probably suggests that the max heap size is not sufficient.
>>>
>>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>>>  28944M->26018M(28960M), 51.5256128 secs]
>>>>     [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>>> 113556K->112729K(1150976K)]
>>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>>
>>>
>>> I would not be so sure about the memory leak. I think it could be a
>>> normal pattern that memory keeps growing as more data is processed. E.g.,
>>> from the provided log, I see window operation tasks executed in the task
>>> manager. Such operation might accumulate data until the window is emitted.
>>>
>>> Maybe Ori you can also take a look at the task manager log when the job
>>> runs with Flink 1.9 without this problem, see how the heap size changed. As
>>> I mentioned before, it is possible that, with the same configurations Flink
>>> 1.10 has less heap size compared to Flink 1.9, due to the memory model
>>> changes.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski <or...@gmail.com> wrote:
>>>
>>>> Thank you very much for your analysis.
>>>>
>>>> When I said there was no memory leak - I meant that from the specific
>>>> TaskManager I monitored in real-time using JProfiler.
>>>> Unfortunately, this problem occurs only in 1 of the TaskManager and you
>>>> cannot anticipate which. So when you pick a TM to profile at random -
>>>> everything looks fine.
>>>>
>>>> I'm running the job again with Java FlightRecorder now, and I hope I'll
>>>> find the reason for the memory leak.
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
>>>> khachatryan.roman@gmail.com> wrote:
>>>>
>>>>> Thanks, Ori
>>>>>
>>>>> From the log, it looks like there IS a memory leak.
>>>>>
>>>>> At 10:12:53 there was the last "successfull" gc when 13Gb freed in
>>>>> 0.4653809 secs:
>>>>> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
>>>>> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]
>>>>>
>>>>> Then the heap grew from 10G to 28G with GC not being able to free up
>>>>> enough space:
>>>>> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
>>>>> 12591.0M(28960.0M)->11247.0M(28960.0M)]
>>>>> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
>>>>> 12103.0M(28960.0M)->11655.0M(28960.0M)]
>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>>>>> 12929.0M(28960.0M)->12467.0M(28960.0M)]
>>>>> ... ...
>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>>>>> 28042.6M(28960.0M)->27220.6M(28960.0M)]
>>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>>>>> 28494.5M(28960.0M)->28720.6M(28960.0M)]
>>>>> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
>>>>> 28944.6M(28960.0M)->28944.6M(28960.0M)]
>>>>>
>>>>> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
>>>>> heartbeat timed out:
>>>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>>>>  28944M->26018M(28960M), 51.5256128 secs]
>>>>>   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>>>> 113556K->112729K(1150976K)]
>>>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>>>> 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort]
>>>>> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
>>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
>>>>> JobManager with id bc59ba6a
>>>>>
>>>>> No substantial amount memory was freed after that.
>>>>>
>>>>> If this memory usage pattern is expected, I'd suggest to:
>>>>> 1. increase heap size
>>>>> 2. play with PrintStringDeduplicationStatistics and
>>>>> UseStringDeduplication flags - probably string deduplication is making G1
>>>>> slower then CMS
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <or...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'd be happy to :) Attached is a TaskManager log which timed out.
>>>>>>
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <to...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Maybe you can share the log and gc-log of the problematic
>>>>>>> TaskManager? See if we can find any clue.
>>>>>>>
>>>>>>> Thank you~
>>>>>>>
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <or...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I've found out that sometimes one of my TaskManagers experiences a
>>>>>>>> GC pause of 40-50 seconds and I have no idea why.
>>>>>>>> I profiled one of the machines using JProfiler and everything looks
>>>>>>>> fine. No memory leaks, memory is low.
>>>>>>>> However, I cannot anticipate which of the machines will get the
>>>>>>>> 40-50 seconds pause and I also cannot profile all of them all the time.
>>>>>>>>
>>>>>>>> Any suggestions?
>>>>>>>>
>>>>>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <to...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> In Flink 1.10, there's a huge change in the memory management
>>>>>>>>> compared to previous versions. This could be related to your observations,
>>>>>>>>> because with the same configurations, it is possible that there's less JVM
>>>>>>>>> heap space (with more off-heap memory). Please take a look at this
>>>>>>>>> migration guide [1].
>>>>>>>>>
>>>>>>>>> Thank you~
>>>>>>>>>
>>>>>>>>> Xintong Song
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>>>>>>>
>>>>>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for the suggestions!
>>>>>>>>>>
>>>>>>>>>> > i recently tried 1.10 and see this error frequently. and i dont
>>>>>>>>>> have the same issue when running with 1.9.1
>>>>>>>>>> I did downgrade to Flink 1.9 and there's certainly no change in
>>>>>>>>>> the occurrences in the heartbeat timeout
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>>    - Probably the most straightforward way is to try increasing
>>>>>>>>>>    the timeout to see if that helps. You can leverage the configuration option
>>>>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>>>    - It might be helpful to share your configuration setups
>>>>>>>>>>    (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe the easiest
>>>>>>>>>>    way is to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>>>>    querying services.
>>>>>>>>>>    - You may also look into the status of the JM process. If JM
>>>>>>>>>>    is under significant GC pressure, it could also happen that the heartbeat
>>>>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>>>>    - Is there any metrics monitoring the network condition
>>>>>>>>>>    between the JM and timeouted TM? Possibly any jitters?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Weirdly enough, I did manage to find a problem with the timed out
>>>>>>>>>> TaskManagers, which slipped away the last time I checked: The timed out
>>>>>>>>>> TaskManager is always the one with the max. GC time (young generation). I
>>>>>>>>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>>>>>>>>> the case.
>>>>>>>>>>
>>>>>>>>>> Does anyone know what can cause high GC time and how to mitigate
>>>>>>>>>> this?
>>>>>>>>>>
>>>>>>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <
>>>>>>>>>> tonysong820@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Ori,
>>>>>>>>>>>
>>>>>>>>>>> Here are some suggestions from my side.
>>>>>>>>>>>
>>>>>>>>>>>    - Probably the most straightforward way is to try increasing
>>>>>>>>>>>    the timeout to see if that helps. You can leverage the configuration option
>>>>>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>>>>    - It might be helpful to share your configuration setups
>>>>>>>>>>>    (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe the easiest
>>>>>>>>>>>    way is to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>>>>>    querying services.
>>>>>>>>>>>    - You may also look into the status of the JM process. If JM
>>>>>>>>>>>    is under significant GC pressure, it could also happen that the heartbeat
>>>>>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>>>>>    - Is there any metrics monitoring the network condition
>>>>>>>>>>>    between the JM and timeouted TM? Possibly any jitters?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thank you~
>>>>>>>>>>>
>>>>>>>>>>> Xintong Song
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <or...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>>>>>>>> partitions and I have parallelism of 189.
>>>>>>>>>>>>
>>>>>>>>>>>> Currently running with RocksDB, with checkpointing disabled. My
>>>>>>>>>>>> state size is appx. 500gb.
>>>>>>>>>>>>
>>>>>>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out"
>>>>>>>>>>>> errors with no apparent reason.
>>>>>>>>>>>>
>>>>>>>>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>>>>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>>>>>>>>> load, total out-records, total in-records, backpressure, and everything I
>>>>>>>>>>>> can think of. But all those metrics show that there's nothing unusual, and
>>>>>>>>>>>> it has around average values for all those metrics. There are a lot of
>>>>>>>>>>>> other containers which score higher.
>>>>>>>>>>>>
>>>>>>>>>>>> All the metrics are very low because every TaskManager runs on
>>>>>>>>>>>> a r5.2xlarge machine alone.
>>>>>>>>>>>>
>>>>>>>>>>>> I'm trying to debug this for days and I cannot find any
>>>>>>>>>>>> explanation for it.
>>>>>>>>>>>>
>>>>>>>>>>>> Can someone explain why it's happening?
>>>>>>>>>>>>
>>>>>>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager
>>>>>>>>>>>> with id container_1593074931633_0011_01_000127 timed out.
>>>>>>>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>>>>>>>> JobMaster.java:1147)
>>>>>>>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl
>>>>>>>>>>>> .run(HeartbeatMonitorImpl.java:109)
>>>>>>>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(
>>>>>>>>>>>> Executors.java:511)
>>>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>>> .handleRunAsync(AkkaRpcActor.java:397)
>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>>> .handleMessage(AkkaRpcActor.java:152)
>>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements
>>>>>>>>>>>> .scala:26)
>>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements
>>>>>>>>>>>> .scala:21)
>>>>>>>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction
>>>>>>>>>>>> .scala:123)
>>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(
>>>>>>>>>>>> CaseStatements.scala:21)
>>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>>>>> .scala:170)
>>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>>>>> .scala:171)
>>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>>>>> .scala:171)
>>>>>>>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor
>>>>>>>>>>>> .scala:225)
>>>>>>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask
>>>>>>>>>>>> .java:260)
>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>>>>>>> ForkJoinPool.java:1339)
>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(
>>>>>>>>>>>> ForkJoinPool.java:1979)
>>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>

Re: Heartbeat of TaskManager timed out.

Posted by Ori Popowski <or...@gmail.com>.
Hi,

I just wanted to update that the problem is now solved!

I suspect that Scala's flatten() method has a memory problem on very large
lists (> 2 billion elements). When using Scala Lists, the memory seems to
leak but the app keeps running, and when using Scala Vectors, a weird
IllegalArgumentException is thrown [1].

I implemented my own flatten() method using Arrays and quickly ran into
NegativeArraySizeException since the integer representing the array size
wrapped around at Integer.MaxValue and became negative. After I started
catching this exception all my cluster problems just resolved. Checkpoints,
the heartbeat timeout, and also the memory and CPU utilization.

I still need to confirm my suspicion towards Scala's flatten() though,
since I haven't "lab-tested" it.

[1] https://github.com/NetLogo/NetLogo/issues/1830

On Sun, Jul 5, 2020 at 2:21 PM Ori Popowski <or...@gmail.com> wrote:

> Hi,
>
> I initially thought this, so this is why my heap is almost 30GiB.
> However, I started to analyze the Java Flight Recorder files, and I
> suspect there's a memory leak in Scala's flatten() method.
> I changed the line that uses flatten(), and instead of flatten() I'm just
> creating a ByteArray the size flatten() would have returned, and I no
> longer have the heartbeat problem.
>
> So now my code is
>     val recordingData = recordingBytes.flatten
>
> instead of
>     val recordingData =
> Array.fill[Byte](recordingBytes.map(_.length).sum)(0)
>
> I attach a screenshot of Java Mission Control
>
>
>
> On Fri, Jul 3, 2020 at 7:24 AM Xintong Song <to...@gmail.com> wrote:
>
>> I agree with Roman's suggestion for increasing heap size.
>>
>> It seems that the heap grows faster than freed. Thus eventually the Full
>> GC is triggered, taking more than 50s and causing the timeout. However,
>> even the full GC frees only 2GB space out of the 28GB max size. That
>> probably suggests that the max heap size is not sufficient.
>>
>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>>  28944M->26018M(28960M), 51.5256128 secs]
>>>     [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>> 113556K->112729K(1150976K)]
>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>
>>
>> I would not be so sure about the memory leak. I think it could be a
>> normal pattern that memory keeps growing as more data is processed. E.g.,
>> from the provided log, I see window operation tasks executed in the task
>> manager. Such operation might accumulate data until the window is emitted.
>>
>> Maybe Ori you can also take a look at the task manager log when the job
>> runs with Flink 1.9 without this problem, see how the heap size changed. As
>> I mentioned before, it is possible that, with the same configurations Flink
>> 1.10 has less heap size compared to Flink 1.9, due to the memory model
>> changes.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski <or...@gmail.com> wrote:
>>
>>> Thank you very much for your analysis.
>>>
>>> When I said there was no memory leak - I meant that from the specific
>>> TaskManager I monitored in real-time using JProfiler.
>>> Unfortunately, this problem occurs only in 1 of the TaskManager and you
>>> cannot anticipate which. So when you pick a TM to profile at random -
>>> everything looks fine.
>>>
>>> I'm running the job again with Java FlightRecorder now, and I hope I'll
>>> find the reason for the memory leak.
>>>
>>> Thanks!
>>>
>>> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
>>> khachatryan.roman@gmail.com> wrote:
>>>
>>>> Thanks, Ori
>>>>
>>>> From the log, it looks like there IS a memory leak.
>>>>
>>>> At 10:12:53 there was the last "successfull" gc when 13Gb freed in
>>>> 0.4653809 secs:
>>>> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
>>>> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]
>>>>
>>>> Then the heap grew from 10G to 28G with GC not being able to free up
>>>> enough space:
>>>> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
>>>> 12591.0M(28960.0M)->11247.0M(28960.0M)]
>>>> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
>>>> 12103.0M(28960.0M)->11655.0M(28960.0M)]
>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>>>> 12929.0M(28960.0M)->12467.0M(28960.0M)]
>>>> ... ...
>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>>>> 28042.6M(28960.0M)->27220.6M(28960.0M)]
>>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>>>> 28494.5M(28960.0M)->28720.6M(28960.0M)]
>>>> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
>>>> 28944.6M(28960.0M)->28944.6M(28960.0M)]
>>>>
>>>> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
>>>> heartbeat timed out:
>>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>>>  28944M->26018M(28960M), 51.5256128 secs]
>>>>   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>>> 113556K->112729K(1150976K)]
>>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>>> 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort]
>>>> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
>>>> JobManager with id bc59ba6a
>>>>
>>>> No substantial amount memory was freed after that.
>>>>
>>>> If this memory usage pattern is expected, I'd suggest to:
>>>> 1. increase heap size
>>>> 2. play with PrintStringDeduplicationStatistics and
>>>> UseStringDeduplication flags - probably string deduplication is making G1
>>>> slower then CMS
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <or...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'd be happy to :) Attached is a TaskManager log which timed out.
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <to...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Maybe you can share the log and gc-log of the problematic
>>>>>> TaskManager? See if we can find any clue.
>>>>>>
>>>>>> Thank you~
>>>>>>
>>>>>> Xintong Song
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <or...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've found out that sometimes one of my TaskManagers experiences a
>>>>>>> GC pause of 40-50 seconds and I have no idea why.
>>>>>>> I profiled one of the machines using JProfiler and everything looks
>>>>>>> fine. No memory leaks, memory is low.
>>>>>>> However, I cannot anticipate which of the machines will get the
>>>>>>> 40-50 seconds pause and I also cannot profile all of them all the time.
>>>>>>>
>>>>>>> Any suggestions?
>>>>>>>
>>>>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <to...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> In Flink 1.10, there's a huge change in the memory management
>>>>>>>> compared to previous versions. This could be related to your observations,
>>>>>>>> because with the same configurations, it is possible that there's less JVM
>>>>>>>> heap space (with more off-heap memory). Please take a look at this
>>>>>>>> migration guide [1].
>>>>>>>>
>>>>>>>> Thank you~
>>>>>>>>
>>>>>>>> Xintong Song
>>>>>>>>
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>>>>>>
>>>>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the suggestions!
>>>>>>>>>
>>>>>>>>> > i recently tried 1.10 and see this error frequently. and i dont
>>>>>>>>> have the same issue when running with 1.9.1
>>>>>>>>> I did downgrade to Flink 1.9 and there's certainly no change in
>>>>>>>>> the occurrences in the heartbeat timeout
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>>    - Probably the most straightforward way is to try increasing
>>>>>>>>>    the timeout to see if that helps. You can leverage the configuration option
>>>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>>    - It might be helpful to share your configuration setups
>>>>>>>>>    (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe the easiest
>>>>>>>>>    way is to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>>>    querying services.
>>>>>>>>>    - You may also look into the status of the JM process. If JM
>>>>>>>>>    is under significant GC pressure, it could also happen that the heartbeat
>>>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>>>    - Is there any metrics monitoring the network condition
>>>>>>>>>    between the JM and timeouted TM? Possibly any jitters?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Weirdly enough, I did manage to find a problem with the timed out
>>>>>>>>> TaskManagers, which slipped away the last time I checked: The timed out
>>>>>>>>> TaskManager is always the one with the max. GC time (young generation). I
>>>>>>>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>>>>>>>> the case.
>>>>>>>>>
>>>>>>>>> Does anyone know what can cause high GC time and how to mitigate
>>>>>>>>> this?
>>>>>>>>>
>>>>>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <
>>>>>>>>> tonysong820@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Ori,
>>>>>>>>>>
>>>>>>>>>> Here are some suggestions from my side.
>>>>>>>>>>
>>>>>>>>>>    - Probably the most straightforward way is to try increasing
>>>>>>>>>>    the timeout to see if that helps. You can leverage the configuration option
>>>>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>>>    - It might be helpful to share your configuration setups
>>>>>>>>>>    (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe the easiest
>>>>>>>>>>    way is to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>>>>    querying services.
>>>>>>>>>>    - You may also look into the status of the JM process. If JM
>>>>>>>>>>    is under significant GC pressure, it could also happen that the heartbeat
>>>>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>>>>    - Is there any metrics monitoring the network condition
>>>>>>>>>>    between the JM and timeouted TM? Possibly any jitters?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thank you~
>>>>>>>>>>
>>>>>>>>>> Xintong Song
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <or...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>>
>>>>>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>>>>>>> partitions and I have parallelism of 189.
>>>>>>>>>>>
>>>>>>>>>>> Currently running with RocksDB, with checkpointing disabled. My
>>>>>>>>>>> state size is appx. 500gb.
>>>>>>>>>>>
>>>>>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors
>>>>>>>>>>> with no apparent reason.
>>>>>>>>>>>
>>>>>>>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>>>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>>>>>>>> load, total out-records, total in-records, backpressure, and everything I
>>>>>>>>>>> can think of. But all those metrics show that there's nothing unusual, and
>>>>>>>>>>> it has around average values for all those metrics. There are a lot of
>>>>>>>>>>> other containers which score higher.
>>>>>>>>>>>
>>>>>>>>>>> All the metrics are very low because every TaskManager runs on a
>>>>>>>>>>> r5.2xlarge machine alone.
>>>>>>>>>>>
>>>>>>>>>>> I'm trying to debug this for days and I cannot find any
>>>>>>>>>>> explanation for it.
>>>>>>>>>>>
>>>>>>>>>>> Can someone explain why it's happening?
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager
>>>>>>>>>>> with id container_1593074931633_0011_01_000127 timed out.
>>>>>>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>>>>>>> JobMaster.java:1147)
>>>>>>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl
>>>>>>>>>>> .run(HeartbeatMonitorImpl.java:109)
>>>>>>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(
>>>>>>>>>>> Executors.java:511)
>>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>> .handleRunAsync(AkkaRpcActor.java:397)
>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>>> .handleMessage(AkkaRpcActor.java:152)
>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements
>>>>>>>>>>> .scala:26)
>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements
>>>>>>>>>>> .scala:21)
>>>>>>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction
>>>>>>>>>>> .scala:123)
>>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements
>>>>>>>>>>> .scala:21)
>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>>>> .scala:170)
>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>>>> .scala:171)
>>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>>>> .scala:171)
>>>>>>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor
>>>>>>>>>>> .scala:225)
>>>>>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask
>>>>>>>>>>> .java:260)
>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>>>>>> ForkJoinPool.java:1339)
>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(
>>>>>>>>>>> ForkJoinPool.java:1979)
>>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>

Re: Heartbeat of TaskManager timed out.

Posted by Ori Popowski <or...@gmail.com>.
Hi,

I initially thought this, so this is why my heap is almost 30GiB.
However, I started to analyze the Java Flight Recorder files, and I suspect
there's a memory leak in Scala's flatten() method.
I changed the line that uses flatten(), and instead of flatten() I'm just
creating a ByteArray the size flatten() would have returned, and I no
longer have the heartbeat problem.

So now my code is
    val recordingData = recordingBytes.flatten

instead of
    val recordingData =
Array.fill[Byte](recordingBytes.map(_.length).sum)(0)

I attach a screenshot of Java Mission Control



On Fri, Jul 3, 2020 at 7:24 AM Xintong Song <to...@gmail.com> wrote:

> I agree with Roman's suggestion for increasing heap size.
>
> It seems that the heap grows faster than freed. Thus eventually the Full
> GC is triggered, taking more than 50s and causing the timeout. However,
> even the full GC frees only 2GB space out of the 28GB max size. That
> probably suggests that the max heap size is not sufficient.
>
>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>  28944M->26018M(28960M), 51.5256128 secs]
>>     [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>> 113556K->112729K(1150976K)]
>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>
>
> I would not be so sure about the memory leak. I think it could be a normal
> pattern that memory keeps growing as more data is processed. E.g., from the
> provided log, I see window operation tasks executed in the task manager.
> Such operation might accumulate data until the window is emitted.
>
> Maybe Ori you can also take a look at the task manager log when the job
> runs with Flink 1.9 without this problem, see how the heap size changed. As
> I mentioned before, it is possible that, with the same configurations Flink
> 1.10 has less heap size compared to Flink 1.9, due to the memory model
> changes.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski <or...@gmail.com> wrote:
>
>> Thank you very much for your analysis.
>>
>> When I said there was no memory leak - I meant that from the specific
>> TaskManager I monitored in real-time using JProfiler.
>> Unfortunately, this problem occurs only in 1 of the TaskManager and you
>> cannot anticipate which. So when you pick a TM to profile at random -
>> everything looks fine.
>>
>> I'm running the job again with Java FlightRecorder now, and I hope I'll
>> find the reason for the memory leak.
>>
>> Thanks!
>>
>> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
>> khachatryan.roman@gmail.com> wrote:
>>
>>> Thanks, Ori
>>>
>>> From the log, it looks like there IS a memory leak.
>>>
>>> At 10:12:53 there was the last "successfull" gc when 13Gb freed in
>>> 0.4653809 secs:
>>> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
>>> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]
>>>
>>> Then the heap grew from 10G to 28G with GC not being able to free up
>>> enough space:
>>> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
>>> 12591.0M(28960.0M)->11247.0M(28960.0M)]
>>> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
>>> 12103.0M(28960.0M)->11655.0M(28960.0M)]
>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>>> 12929.0M(28960.0M)->12467.0M(28960.0M)]
>>> ... ...
>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>>> 28042.6M(28960.0M)->27220.6M(28960.0M)]
>>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>>> 28494.5M(28960.0M)->28720.6M(28960.0M)]
>>> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
>>> 28944.6M(28960.0M)->28944.6M(28960.0M)]
>>>
>>> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
>>> heartbeat timed out:
>>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>>  28944M->26018M(28960M), 51.5256128 secs]
>>>   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>> 113556K->112729K(1150976K)]
>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>> 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort]
>>> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
>>> JobManager with id bc59ba6a
>>>
>>> No substantial amount memory was freed after that.
>>>
>>> If this memory usage pattern is expected, I'd suggest to:
>>> 1. increase heap size
>>> 2. play with PrintStringDeduplicationStatistics and
>>> UseStringDeduplication flags - probably string deduplication is making G1
>>> slower then CMS
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <or...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'd be happy to :) Attached is a TaskManager log which timed out.
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> Maybe you can share the log and gc-log of the problematic TaskManager?
>>>>> See if we can find any clue.
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <or...@gmail.com> wrote:
>>>>>
>>>>>> I've found out that sometimes one of my TaskManagers experiences a GC
>>>>>> pause of 40-50 seconds and I have no idea why.
>>>>>> I profiled one of the machines using JProfiler and everything looks
>>>>>> fine. No memory leaks, memory is low.
>>>>>> However, I cannot anticipate which of the machines will get the 40-50
>>>>>> seconds pause and I also cannot profile all of them all the time.
>>>>>>
>>>>>> Any suggestions?
>>>>>>
>>>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <to...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> In Flink 1.10, there's a huge change in the memory management
>>>>>>> compared to previous versions. This could be related to your observations,
>>>>>>> because with the same configurations, it is possible that there's less JVM
>>>>>>> heap space (with more off-heap memory). Please take a look at this
>>>>>>> migration guide [1].
>>>>>>>
>>>>>>> Thank you~
>>>>>>>
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>>>>>
>>>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the suggestions!
>>>>>>>>
>>>>>>>> > i recently tried 1.10 and see this error frequently. and i dont
>>>>>>>> have the same issue when running with 1.9.1
>>>>>>>> I did downgrade to Flink 1.9 and there's certainly no change in the
>>>>>>>> occurrences in the heartbeat timeout
>>>>>>>>
>>>>>>>>
>>>>>>>> >
>>>>>>>>
>>>>>>>>    - Probably the most straightforward way is to try increasing
>>>>>>>>    the timeout to see if that helps. You can leverage the configuration option
>>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is
>>>>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>>    querying services.
>>>>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>>    - Is there any metrics monitoring the network condition between
>>>>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>>>>
>>>>>>>>
>>>>>>>> Weirdly enough, I did manage to find a problem with the timed out
>>>>>>>> TaskManagers, which slipped away the last time I checked: The timed out
>>>>>>>> TaskManager is always the one with the max. GC time (young generation). I
>>>>>>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>>>>>>> the case.
>>>>>>>>
>>>>>>>> Does anyone know what can cause high GC time and how to mitigate
>>>>>>>> this?
>>>>>>>>
>>>>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <to...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Ori,
>>>>>>>>>
>>>>>>>>> Here are some suggestions from my side.
>>>>>>>>>
>>>>>>>>>    - Probably the most straightforward way is to try increasing
>>>>>>>>>    the timeout to see if that helps. You can leverage the configuration option
>>>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>>    - It might be helpful to share your configuration setups
>>>>>>>>>    (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe the easiest
>>>>>>>>>    way is to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>>>    querying services.
>>>>>>>>>    - You may also look into the status of the JM process. If JM
>>>>>>>>>    is under significant GC pressure, it could also happen that the heartbeat
>>>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>>>    - Is there any metrics monitoring the network condition
>>>>>>>>>    between the JM and timeouted TM? Possibly any jitters?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thank you~
>>>>>>>>>
>>>>>>>>> Xintong Song
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>>>>>
>>>>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <or...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>>>>>> partitions and I have parallelism of 189.
>>>>>>>>>>
>>>>>>>>>> Currently running with RocksDB, with checkpointing disabled. My
>>>>>>>>>> state size is appx. 500gb.
>>>>>>>>>>
>>>>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors
>>>>>>>>>> with no apparent reason.
>>>>>>>>>>
>>>>>>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>>>>>>> load, total out-records, total in-records, backpressure, and everything I
>>>>>>>>>> can think of. But all those metrics show that there's nothing unusual, and
>>>>>>>>>> it has around average values for all those metrics. There are a lot of
>>>>>>>>>> other containers which score higher.
>>>>>>>>>>
>>>>>>>>>> All the metrics are very low because every TaskManager runs on a
>>>>>>>>>> r5.2xlarge machine alone.
>>>>>>>>>>
>>>>>>>>>> I'm trying to debug this for days and I cannot find any
>>>>>>>>>> explanation for it.
>>>>>>>>>>
>>>>>>>>>> Can someone explain why it's happening?
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager
>>>>>>>>>> with id container_1593074931633_0011_01_000127 timed out.
>>>>>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>>>>>> JobMaster.java:1147)
>>>>>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl
>>>>>>>>>> .run(HeartbeatMonitorImpl.java:109)
>>>>>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(
>>>>>>>>>> Executors.java:511)
>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>> .handleRunAsync(AkkaRpcActor.java:397)
>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>>> .handleMessage(AkkaRpcActor.java:152)
>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:
>>>>>>>>>> 26)
>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:
>>>>>>>>>> 21)
>>>>>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction
>>>>>>>>>> .scala:123)
>>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements
>>>>>>>>>> .scala:21)
>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>>> .scala:170)
>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>>> .scala:171)
>>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>>> .scala:171)
>>>>>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor
>>>>>>>>>> .scala:225)
>>>>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask
>>>>>>>>>> .java:260)
>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>>>>> ForkJoinPool.java:1339)
>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool
>>>>>>>>>> .java:1979)
>>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>

Re: Heartbeat of TaskManager timed out.

Posted by Xintong Song <to...@gmail.com>.
I agree with Roman's suggestion for increasing heap size.

It seems that the heap grows faster than freed. Thus eventually the Full GC
is triggered, taking more than 50s and causing the timeout. However, even
the full GC frees only 2GB space out of the 28GB max size. That probably
suggests that the max heap size is not sufficient.

> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>  28944M->26018M(28960M), 51.5256128 secs]
>     [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
> 113556K->112729K(1150976K)]
>   [Times: user=91.08 sys=0.06, real=51.53 secs]


I would not be so sure about the memory leak. I think it could be a normal
pattern that memory keeps growing as more data is processed. E.g., from the
provided log, I see window operation tasks executed in the task manager.
Such operation might accumulate data until the window is emitted.

Maybe Ori you can also take a look at the task manager log when the job
runs with Flink 1.9 without this problem, see how the heap size changed. As
I mentioned before, it is possible that, with the same configurations Flink
1.10 has less heap size compared to Flink 1.9, due to the memory model
changes.

Thank you~

Xintong Song



On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski <or...@gmail.com> wrote:

> Thank you very much for your analysis.
>
> When I said there was no memory leak - I meant that from the specific
> TaskManager I monitored in real-time using JProfiler.
> Unfortunately, this problem occurs only in 1 of the TaskManager and you
> cannot anticipate which. So when you pick a TM to profile at random -
> everything looks fine.
>
> I'm running the job again with Java FlightRecorder now, and I hope I'll
> find the reason for the memory leak.
>
> Thanks!
>
> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
> khachatryan.roman@gmail.com> wrote:
>
>> Thanks, Ori
>>
>> From the log, it looks like there IS a memory leak.
>>
>> At 10:12:53 there was the last "successfull" gc when 13Gb freed in
>> 0.4653809 secs:
>> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
>> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]
>>
>> Then the heap grew from 10G to 28G with GC not being able to free up
>> enough space:
>> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
>> 12591.0M(28960.0M)->11247.0M(28960.0M)]
>> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
>> 12103.0M(28960.0M)->11655.0M(28960.0M)]
>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>> 12929.0M(28960.0M)->12467.0M(28960.0M)]
>> ... ...
>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>> 28042.6M(28960.0M)->27220.6M(28960.0M)]
>> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
>> 28494.5M(28960.0M)->28720.6M(28960.0M)]
>> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
>> 28944.6M(28960.0M)->28944.6M(28960.0M)]
>>
>> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
>> heartbeat timed out:
>> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>>  28944M->26018M(28960M), 51.5256128 secs]
>>   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>> 113556K->112729K(1150976K)]
>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>> 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort]
>> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
>> JobManager with id bc59ba6a
>>
>> No substantial amount memory was freed after that.
>>
>> If this memory usage pattern is expected, I'd suggest to:
>> 1. increase heap size
>> 2. play with PrintStringDeduplicationStatistics and
>> UseStringDeduplication flags - probably string deduplication is making G1
>> slower then CMS
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <or...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'd be happy to :) Attached is a TaskManager log which timed out.
>>>
>>>
>>> Thanks!
>>>
>>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <to...@gmail.com>
>>> wrote:
>>>
>>>> Maybe you can share the log and gc-log of the problematic TaskManager?
>>>> See if we can find any clue.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <or...@gmail.com> wrote:
>>>>
>>>>> I've found out that sometimes one of my TaskManagers experiences a GC
>>>>> pause of 40-50 seconds and I have no idea why.
>>>>> I profiled one of the machines using JProfiler and everything looks
>>>>> fine. No memory leaks, memory is low.
>>>>> However, I cannot anticipate which of the machines will get the 40-50
>>>>> seconds pause and I also cannot profile all of them all the time.
>>>>>
>>>>> Any suggestions?
>>>>>
>>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <to...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> In Flink 1.10, there's a huge change in the memory management
>>>>>> compared to previous versions. This could be related to your observations,
>>>>>> because with the same configurations, it is possible that there's less JVM
>>>>>> heap space (with more off-heap memory). Please take a look at this
>>>>>> migration guide [1].
>>>>>>
>>>>>> Thank you~
>>>>>>
>>>>>> Xintong Song
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>>>>
>>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the suggestions!
>>>>>>>
>>>>>>> > i recently tried 1.10 and see this error frequently. and i dont
>>>>>>> have the same issue when running with 1.9.1
>>>>>>> I did downgrade to Flink 1.9 and there's certainly no change in the
>>>>>>> occurrences in the heartbeat timeout
>>>>>>>
>>>>>>>
>>>>>>> >
>>>>>>>
>>>>>>>    - Probably the most straightforward way is to try increasing the
>>>>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is
>>>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>    parameters and all the loaded configurations.
>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>    querying services.
>>>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>    - Is there any metrics monitoring the network condition between
>>>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>>>
>>>>>>>
>>>>>>> Weirdly enough, I did manage to find a problem with the timed out
>>>>>>> TaskManagers, which slipped away the last time I checked: The timed out
>>>>>>> TaskManager is always the one with the max. GC time (young generation). I
>>>>>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>>>>>> the case.
>>>>>>>
>>>>>>> Does anyone know what can cause high GC time and how to mitigate
>>>>>>> this?
>>>>>>>
>>>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <to...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Ori,
>>>>>>>>
>>>>>>>> Here are some suggestions from my side.
>>>>>>>>
>>>>>>>>    - Probably the most straightforward way is to try increasing
>>>>>>>>    the timeout to see if that helps. You can leverage the configuration option
>>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is
>>>>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>>    parameters and all the loaded configurations.
>>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>>    querying services.
>>>>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>>    - Is there any metrics monitoring the network condition between
>>>>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thank you~
>>>>>>>>
>>>>>>>> Xintong Song
>>>>>>>>
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>>>>
>>>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <or...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>>>>> partitions and I have parallelism of 189.
>>>>>>>>>
>>>>>>>>> Currently running with RocksDB, with checkpointing disabled. My
>>>>>>>>> state size is appx. 500gb.
>>>>>>>>>
>>>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors
>>>>>>>>> with no apparent reason.
>>>>>>>>>
>>>>>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>>>>>> load, total out-records, total in-records, backpressure, and everything I
>>>>>>>>> can think of. But all those metrics show that there's nothing unusual, and
>>>>>>>>> it has around average values for all those metrics. There are a lot of
>>>>>>>>> other containers which score higher.
>>>>>>>>>
>>>>>>>>> All the metrics are very low because every TaskManager runs on a
>>>>>>>>> r5.2xlarge machine alone.
>>>>>>>>>
>>>>>>>>> I'm trying to debug this for days and I cannot find any
>>>>>>>>> explanation for it.
>>>>>>>>>
>>>>>>>>> Can someone explain why it's happening?
>>>>>>>>>
>>>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager
>>>>>>>>> with id container_1593074931633_0011_01_000127 timed out.
>>>>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>>>>> JobMaster.java:1147)
>>>>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl
>>>>>>>>> .run(HeartbeatMonitorImpl.java:109)
>>>>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(
>>>>>>>>> Executors.java:511)
>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>> .handleRunAsync(AkkaRpcActor.java:397)
>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>>> .handleMessage(AkkaRpcActor.java:152)
>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:
>>>>>>>>> 26)
>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:
>>>>>>>>> 21)
>>>>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction
>>>>>>>>> .scala:123)
>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements
>>>>>>>>> .scala:21)
>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>> .scala:170)
>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>> .scala:171)
>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>>> .scala:171)
>>>>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:
>>>>>>>>> 225)
>>>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask
>>>>>>>>> .java:260)
>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>>>> ForkJoinPool.java:1339)
>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool
>>>>>>>>> .java:1979)
>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>

Re: Heartbeat of TaskManager timed out.

Posted by Ori Popowski <or...@gmail.com>.
Thank you very much for your analysis.

When I said there was no memory leak - I meant that from the specific
TaskManager I monitored in real-time using JProfiler.
Unfortunately, this problem occurs only in 1 of the TaskManager and you
cannot anticipate which. So when you pick a TM to profile at random -
everything looks fine.

I'm running the job again with Java FlightRecorder now, and I hope I'll
find the reason for the memory leak.

Thanks!

On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> Thanks, Ori
>
> From the log, it looks like there IS a memory leak.
>
> At 10:12:53 there was the last "successfull" gc when 13Gb freed in
> 0.4653809 secs:
> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]
>
> Then the heap grew from 10G to 28G with GC not being able to free up
> enough space:
> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
> 12591.0M(28960.0M)->11247.0M(28960.0M)]
> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
> 12103.0M(28960.0M)->11655.0M(28960.0M)]
> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
> 12929.0M(28960.0M)->12467.0M(28960.0M)]
> ... ...
> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
> 28042.6M(28960.0M)->27220.6M(28960.0M)]
> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
> 28494.5M(28960.0M)->28720.6M(28960.0M)]
> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
> 28944.6M(28960.0M)->28944.6M(28960.0M)]
>
> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
> heartbeat timed out:
> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>  28944M->26018M(28960M), 51.5256128 secs]
>   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
> 113556K->112729K(1150976K)]
>   [Times: user=91.08 sys=0.06, real=51.53 secs]
> 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort]
> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
> JobManager with id bc59ba6a
>
> No substantial amount memory was freed after that.
>
> If this memory usage pattern is expected, I'd suggest to:
> 1. increase heap size
> 2. play with PrintStringDeduplicationStatistics and UseStringDeduplication
> flags - probably string deduplication is making G1 slower then CMS
>
> Regards,
> Roman
>
>
> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <or...@gmail.com> wrote:
>
>> Hi,
>>
>> I'd be happy to :) Attached is a TaskManager log which timed out.
>>
>>
>> Thanks!
>>
>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> Maybe you can share the log and gc-log of the problematic TaskManager?
>>> See if we can find any clue.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <or...@gmail.com> wrote:
>>>
>>>> I've found out that sometimes one of my TaskManagers experiences a GC
>>>> pause of 40-50 seconds and I have no idea why.
>>>> I profiled one of the machines using JProfiler and everything looks
>>>> fine. No memory leaks, memory is low.
>>>> However, I cannot anticipate which of the machines will get the 40-50
>>>> seconds pause and I also cannot profile all of them all the time.
>>>>
>>>> Any suggestions?
>>>>
>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> In Flink 1.10, there's a huge change in the memory management compared
>>>>> to previous versions. This could be related to your observations, because
>>>>> with the same configurations, it is possible that there's less JVM heap
>>>>> space (with more off-heap memory). Please take a look at this migration
>>>>> guide [1].
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>>>
>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the suggestions!
>>>>>>
>>>>>> > i recently tried 1.10 and see this error frequently. and i dont
>>>>>> have the same issue when running with 1.9.1
>>>>>> I did downgrade to Flink 1.9 and there's certainly no change in the
>>>>>> occurrences in the heartbeat timeout
>>>>>>
>>>>>>
>>>>>> >
>>>>>>
>>>>>>    - Probably the most straightforward way is to try increasing the
>>>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is
>>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>>    parameters and all the loaded configurations.
>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>    querying services.
>>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>    - Is there any metrics monitoring the network condition between
>>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>>
>>>>>>
>>>>>> Weirdly enough, I did manage to find a problem with the timed out
>>>>>> TaskManagers, which slipped away the last time I checked: The timed out
>>>>>> TaskManager is always the one with the max. GC time (young generation). I
>>>>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>>>>> the case.
>>>>>>
>>>>>> Does anyone know what can cause high GC time and how to mitigate this?
>>>>>>
>>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <to...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Ori,
>>>>>>>
>>>>>>> Here are some suggestions from my side.
>>>>>>>
>>>>>>>    - Probably the most straightforward way is to try increasing the
>>>>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is
>>>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>    parameters and all the loaded configurations.
>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>    querying services.
>>>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>    - Is there any metrics monitoring the network condition between
>>>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>>>
>>>>>>>
>>>>>>> Thank you~
>>>>>>>
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>>>
>>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <or...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>>>> partitions and I have parallelism of 189.
>>>>>>>>
>>>>>>>> Currently running with RocksDB, with checkpointing disabled. My
>>>>>>>> state size is appx. 500gb.
>>>>>>>>
>>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors
>>>>>>>> with no apparent reason.
>>>>>>>>
>>>>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>>>>> load, total out-records, total in-records, backpressure, and everything I
>>>>>>>> can think of. But all those metrics show that there's nothing unusual, and
>>>>>>>> it has around average values for all those metrics. There are a lot of
>>>>>>>> other containers which score higher.
>>>>>>>>
>>>>>>>> All the metrics are very low because every TaskManager runs on a
>>>>>>>> r5.2xlarge machine alone.
>>>>>>>>
>>>>>>>> I'm trying to debug this for days and I cannot find any explanation
>>>>>>>> for it.
>>>>>>>>
>>>>>>>> Can someone explain why it's happening?
>>>>>>>>
>>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager
>>>>>>>> with id container_1593074931633_0011_01_000127 timed out.
>>>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>>>> JobMaster.java:1147)
>>>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>>>>>> HeartbeatMonitorImpl.java:109)
>>>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(
>>>>>>>> Executors.java:511)
>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>> .handleRunAsync(AkkaRpcActor.java:397)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>> .handleMessage(AkkaRpcActor.java:152)
>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26
>>>>>>>> )
>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21
>>>>>>>> )
>>>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction
>>>>>>>> .scala:123)
>>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements
>>>>>>>> .scala:21)
>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>> .scala:170)
>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>> .scala:171)
>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>> .scala:171)
>>>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:
>>>>>>>> 225)
>>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask
>>>>>>>> .java:260)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>>> ForkJoinPool.java:1339)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool
>>>>>>>> .java:1979)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>

Re: Heartbeat of TaskManager timed out.

Posted by Khachatryan Roman <kh...@gmail.com>.
Thanks, Ori

From the log, it looks like there IS a memory leak.

At 10:12:53 there was the last "successfull" gc when 13Gb freed in
0.4653809 secs:
[Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M Heap:
23280.3M(28960.0M)->10047.0M(28960.0M)]

Then the heap grew from 10G to 28G with GC not being able to free up enough
space:
[Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
12591.0M(28960.0M)->11247.0M(28960.0M)]
[Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
12103.0M(28960.0M)->11655.0M(28960.0M)]
[Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
12929.0M(28960.0M)->12467.0M(28960.0M)]
... ...
[Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
28042.6M(28960.0M)->27220.6M(28960.0M)]
[Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
28494.5M(28960.0M)->28720.6M(28960.0M)]
[Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
28944.6M(28960.0M)->28944.6M(28960.0M)]

Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
heartbeat timed out:
2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
 28944M->26018M(28960M), 51.5256128 secs]
  [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
113556K->112729K(1150976K)]
  [Times: user=91.08 sys=0.06, real=51.53 secs]
2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort]
10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
JobManager with id bc59ba6a

No substantial amount memory was freed after that.

If this memory usage pattern is expected, I'd suggest to:
1. increase heap size
2. play with PrintStringDeduplicationStatistics and UseStringDeduplication
flags - probably string deduplication is making G1 slower then CMS

Regards,
Roman


On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <or...@gmail.com> wrote:

> Hi,
>
> I'd be happy to :) Attached is a TaskManager log which timed out.
>
>
> Thanks!
>
> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <to...@gmail.com> wrote:
>
>> Maybe you can share the log and gc-log of the problematic TaskManager?
>> See if we can find any clue.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <or...@gmail.com> wrote:
>>
>>> I've found out that sometimes one of my TaskManagers experiences a GC
>>> pause of 40-50 seconds and I have no idea why.
>>> I profiled one of the machines using JProfiler and everything looks
>>> fine. No memory leaks, memory is low.
>>> However, I cannot anticipate which of the machines will get the 40-50
>>> seconds pause and I also cannot profile all of them all the time.
>>>
>>> Any suggestions?
>>>
>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <to...@gmail.com>
>>> wrote:
>>>
>>>> In Flink 1.10, there's a huge change in the memory management compared
>>>> to previous versions. This could be related to your observations, because
>>>> with the same configurations, it is possible that there's less JVM heap
>>>> space (with more off-heap memory). Please take a look at this migration
>>>> guide [1].
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>>
>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks for the suggestions!
>>>>>
>>>>> > i recently tried 1.10 and see this error frequently. and i dont have
>>>>> the same issue when running with 1.9.1
>>>>> I did downgrade to Flink 1.9 and there's certainly no change in the
>>>>> occurrences in the heartbeat timeout
>>>>>
>>>>>
>>>>> >
>>>>>
>>>>>    - Probably the most straightforward way is to try increasing the
>>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is
>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>    parameters and all the loaded configurations.
>>>>>    - You may want to look into the GC logs in addition to the
>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>    most recent metrics due to the process not responding to the metric
>>>>>    querying services.
>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>>    message from TM is not timely handled before the timeout check.
>>>>>    - Is there any metrics monitoring the network condition between
>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>
>>>>>
>>>>> Weirdly enough, I did manage to find a problem with the timed out
>>>>> TaskManagers, which slipped away the last time I checked: The timed out
>>>>> TaskManager is always the one with the max. GC time (young generation). I
>>>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>>>> the case.
>>>>>
>>>>> Does anyone know what can cause high GC time and how to mitigate this?
>>>>>
>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <to...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ori,
>>>>>>
>>>>>> Here are some suggestions from my side.
>>>>>>
>>>>>>    - Probably the most straightforward way is to try increasing the
>>>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is
>>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>>    parameters and all the loaded configurations.
>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>    querying services.
>>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>    - Is there any metrics monitoring the network condition between
>>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>>
>>>>>>
>>>>>> Thank you~
>>>>>>
>>>>>> Xintong Song
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>>
>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <or...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>>> partitions and I have parallelism of 189.
>>>>>>>
>>>>>>> Currently running with RocksDB, with checkpointing disabled. My
>>>>>>> state size is appx. 500gb.
>>>>>>>
>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors
>>>>>>> with no apparent reason.
>>>>>>>
>>>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>>>> load, total out-records, total in-records, backpressure, and everything I
>>>>>>> can think of. But all those metrics show that there's nothing unusual, and
>>>>>>> it has around average values for all those metrics. There are a lot of
>>>>>>> other containers which score higher.
>>>>>>>
>>>>>>> All the metrics are very low because every TaskManager runs on a
>>>>>>> r5.2xlarge machine alone.
>>>>>>>
>>>>>>> I'm trying to debug this for days and I cannot find any explanation
>>>>>>> for it.
>>>>>>>
>>>>>>> Can someone explain why it's happening?
>>>>>>>
>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager
>>>>>>> with id container_1593074931633_0011_01_000127 timed out.
>>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>>> JobMaster.java:1147)
>>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>>>>> HeartbeatMonitorImpl.java:109)
>>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>>>>>>> .java:511)
>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>> .handleRunAsync(AkkaRpcActor.java:397)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>>>>>> AkkaRpcActor.java:152)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction
>>>>>>> .scala:123)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements
>>>>>>> .scala:21)
>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>> .scala:170)
>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>> .scala:171)
>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>> .scala:171)
>>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:
>>>>>>> 225)
>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:
>>>>>>> 260)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>> ForkJoinPool.java:1339)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool
>>>>>>> .java:1979)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>

Re: Heartbeat of TaskManager timed out.

Posted by Ori Popowski <or...@gmail.com>.
Hi,

I'd be happy to :) Attached is a TaskManager log which timed out.


Thanks!

On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <to...@gmail.com> wrote:

> Maybe you can share the log and gc-log of the problematic TaskManager? See
> if we can find any clue.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <or...@gmail.com> wrote:
>
>> I've found out that sometimes one of my TaskManagers experiences a GC
>> pause of 40-50 seconds and I have no idea why.
>> I profiled one of the machines using JProfiler and everything looks fine.
>> No memory leaks, memory is low.
>> However, I cannot anticipate which of the machines will get the 40-50
>> seconds pause and I also cannot profile all of them all the time.
>>
>> Any suggestions?
>>
>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> In Flink 1.10, there's a huge change in the memory management compared
>>> to previous versions. This could be related to your observations, because
>>> with the same configurations, it is possible that there's less JVM heap
>>> space (with more off-heap memory). Please take a look at this migration
>>> guide [1].
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>
>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com> wrote:
>>>
>>>> Thanks for the suggestions!
>>>>
>>>> > i recently tried 1.10 and see this error frequently. and i dont have
>>>> the same issue when running with 1.9.1
>>>> I did downgrade to Flink 1.9 and there's certainly no change in the
>>>> occurrences in the heartbeat timeout
>>>>
>>>>
>>>> >
>>>>
>>>>    - Probably the most straightforward way is to try increasing the
>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>    - It might be helpful to share your configuration setups (e.g., the
>>>>    TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>>>    share the beginning part of your JM/TM logs, including the JVM parameters
>>>>    and all the loaded configurations.
>>>>    - You may want to look into the GC logs in addition to the metrics.
>>>>    In case of a CMS GC stop-the-world, you may not be able to see the most
>>>>    recent metrics due to the process not responding to the metric querying
>>>>    services.
>>>>    - You may also look into the status of the JM process. If JM is
>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>    message from TM is not timely handled before the timeout check.
>>>>    - Is there any metrics monitoring the network condition between the
>>>>    JM and timeouted TM? Possibly any jitters?
>>>>
>>>>
>>>> Weirdly enough, I did manage to find a problem with the timed out
>>>> TaskManagers, which slipped away the last time I checked: The timed out
>>>> TaskManager is always the one with the max. GC time (young generation). I
>>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>>> the case.
>>>>
>>>> Does anyone know what can cause high GC time and how to mitigate this?
>>>>
>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ori,
>>>>>
>>>>> Here are some suggestions from my side.
>>>>>
>>>>>    - Probably the most straightforward way is to try increasing the
>>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is
>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>    parameters and all the loaded configurations.
>>>>>    - You may want to look into the GC logs in addition to the
>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>    most recent metrics due to the process not responding to the metric
>>>>>    querying services.
>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>>    message from TM is not timely handled before the timeout check.
>>>>>    - Is there any metrics monitoring the network condition between
>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>
>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <or...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>> partitions and I have parallelism of 189.
>>>>>>
>>>>>> Currently running with RocksDB, with checkpointing disabled. My state
>>>>>> size is appx. 500gb.
>>>>>>
>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with
>>>>>> no apparent reason.
>>>>>>
>>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>>> load, total out-records, total in-records, backpressure, and everything I
>>>>>> can think of. But all those metrics show that there's nothing unusual, and
>>>>>> it has around average values for all those metrics. There are a lot of
>>>>>> other containers which score higher.
>>>>>>
>>>>>> All the metrics are very low because every TaskManager runs on a
>>>>>> r5.2xlarge machine alone.
>>>>>>
>>>>>> I'm trying to debug this for days and I cannot find any explanation
>>>>>> for it.
>>>>>>
>>>>>> Can someone explain why it's happening?
>>>>>>
>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
>>>>>> id container_1593074931633_0011_01_000127 timed out.
>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>> JobMaster.java:1147)
>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>>>> HeartbeatMonitorImpl.java:109)
>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>>>>>> .java:511)
>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>>>>>> AkkaRpcActor.java:397)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>>>>> AkkaRpcActor.java:152)
>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:
>>>>>> 123)
>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements
>>>>>> .scala:21)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>> .scala:170)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>> .scala:171)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>> .scala:171)
>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225
>>>>>> )
>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:
>>>>>> 260)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>> ForkJoinPool.java:1339)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool
>>>>>> .java:1979)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>

Re: Heartbeat of TaskManager timed out.

Posted by Ori Popowski <or...@gmail.com>.
Hi,

I'd be happy to :) Attached is a TaskManager log which timed out.



Thanks!

On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <to...@gmail.com> wrote:

> Maybe you can share the log and gc-log of the problematic TaskManager? See
> if we can find any clue.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <or...@gmail.com> wrote:
>
>> I've found out that sometimes one of my TaskManagers experiences a GC
>> pause of 40-50 seconds and I have no idea why.
>> I profiled one of the machines using JProfiler and everything looks fine.
>> No memory leaks, memory is low.
>> However, I cannot anticipate which of the machines will get the 40-50
>> seconds pause and I also cannot profile all of them all the time.
>>
>> Any suggestions?
>>
>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> In Flink 1.10, there's a huge change in the memory management compared
>>> to previous versions. This could be related to your observations, because
>>> with the same configurations, it is possible that there's less JVM heap
>>> space (with more off-heap memory). Please take a look at this migration
>>> guide [1].
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>
>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com> wrote:
>>>
>>>> Thanks for the suggestions!
>>>>
>>>> > i recently tried 1.10 and see this error frequently. and i dont have
>>>> the same issue when running with 1.9.1
>>>> I did downgrade to Flink 1.9 and there's certainly no change in the
>>>> occurrences in the heartbeat timeout
>>>>
>>>>
>>>> >
>>>>
>>>>    - Probably the most straightforward way is to try increasing the
>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>    - It might be helpful to share your configuration setups (e.g., the
>>>>    TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>>>    share the beginning part of your JM/TM logs, including the JVM parameters
>>>>    and all the loaded configurations.
>>>>    - You may want to look into the GC logs in addition to the metrics.
>>>>    In case of a CMS GC stop-the-world, you may not be able to see the most
>>>>    recent metrics due to the process not responding to the metric querying
>>>>    services.
>>>>    - You may also look into the status of the JM process. If JM is
>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>    message from TM is not timely handled before the timeout check.
>>>>    - Is there any metrics monitoring the network condition between the
>>>>    JM and timeouted TM? Possibly any jitters?
>>>>
>>>>
>>>> Weirdly enough, I did manage to find a problem with the timed out
>>>> TaskManagers, which slipped away the last time I checked: The timed out
>>>> TaskManager is always the one with the max. GC time (young generation). I
>>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>>> the case.
>>>>
>>>> Does anyone know what can cause high GC time and how to mitigate this?
>>>>
>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ori,
>>>>>
>>>>> Here are some suggestions from my side.
>>>>>
>>>>>    - Probably the most straightforward way is to try increasing the
>>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is
>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>    parameters and all the loaded configurations.
>>>>>    - You may want to look into the GC logs in addition to the
>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to see the
>>>>>    most recent metrics due to the process not responding to the metric
>>>>>    querying services.
>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>>    message from TM is not timely handled before the timeout check.
>>>>>    - Is there any metrics monitoring the network condition between
>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>
>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <or...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>> partitions and I have parallelism of 189.
>>>>>>
>>>>>> Currently running with RocksDB, with checkpointing disabled. My state
>>>>>> size is appx. 500gb.
>>>>>>
>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with
>>>>>> no apparent reason.
>>>>>>
>>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>>> load, total out-records, total in-records, backpressure, and everything I
>>>>>> can think of. But all those metrics show that there's nothing unusual, and
>>>>>> it has around average values for all those metrics. There are a lot of
>>>>>> other containers which score higher.
>>>>>>
>>>>>> All the metrics are very low because every TaskManager runs on a
>>>>>> r5.2xlarge machine alone.
>>>>>>
>>>>>> I'm trying to debug this for days and I cannot find any explanation
>>>>>> for it.
>>>>>>
>>>>>> Can someone explain why it's happening?
>>>>>>
>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
>>>>>> id container_1593074931633_0011_01_000127 timed out.
>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>> JobMaster.java:1147)
>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>>>> HeartbeatMonitorImpl.java:109)
>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>>>>>> .java:511)
>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>>>>>> AkkaRpcActor.java:397)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>>>>> AkkaRpcActor.java:152)
>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:
>>>>>> 123)
>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements
>>>>>> .scala:21)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>> .scala:170)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>> .scala:171)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>> .scala:171)
>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225
>>>>>> )
>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:
>>>>>> 260)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>> ForkJoinPool.java:1339)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool
>>>>>> .java:1979)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>

Re: Heartbeat of TaskManager timed out.

Posted by Xintong Song <to...@gmail.com>.
Maybe you can share the log and gc-log of the problematic TaskManager? See
if we can find any clue.

Thank you~

Xintong Song



On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <or...@gmail.com> wrote:

> I've found out that sometimes one of my TaskManagers experiences a GC
> pause of 40-50 seconds and I have no idea why.
> I profiled one of the machines using JProfiler and everything looks fine.
> No memory leaks, memory is low.
> However, I cannot anticipate which of the machines will get the 40-50
> seconds pause and I also cannot profile all of them all the time.
>
> Any suggestions?
>
> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <to...@gmail.com>
> wrote:
>
>> In Flink 1.10, there's a huge change in the memory management compared to
>> previous versions. This could be related to your observations, because with
>> the same configurations, it is possible that there's less JVM heap space
>> (with more off-heap memory). Please take a look at this migration guide [1].
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>
>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <or...@gmail.com> wrote:
>>
>>> Thanks for the suggestions!
>>>
>>> > i recently tried 1.10 and see this error frequently. and i dont have
>>> the same issue when running with 1.9.1
>>> I did downgrade to Flink 1.9 and there's certainly no change in the
>>> occurrences in the heartbeat timeout
>>>
>>>
>>> >
>>>
>>>    - Probably the most straightforward way is to try increasing the
>>>    timeout to see if that helps. You can leverage the configuration option
>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>    - It might be helpful to share your configuration setups (e.g., the
>>>    TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>>    share the beginning part of your JM/TM logs, including the JVM parameters
>>>    and all the loaded configurations.
>>>    - You may want to look into the GC logs in addition to the metrics.
>>>    In case of a CMS GC stop-the-world, you may not be able to see the most
>>>    recent metrics due to the process not responding to the metric querying
>>>    services.
>>>    - You may also look into the status of the JM process. If JM is
>>>    under significant GC pressure, it could also happen that the heartbeat
>>>    message from TM is not timely handled before the timeout check.
>>>    - Is there any metrics monitoring the network condition between the
>>>    JM and timeouted TM? Possibly any jitters?
>>>
>>>
>>> Weirdly enough, I did manage to find a problem with the timed out
>>> TaskManagers, which slipped away the last time I checked: The timed out
>>> TaskManager is always the one with the max. GC time (young generation). I
>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>> the case.
>>>
>>> Does anyone know what can cause high GC time and how to mitigate this?
>>>
>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <to...@gmail.com>
>>> wrote:
>>>
>>>> Hi Ori,
>>>>
>>>> Here are some suggestions from my side.
>>>>
>>>>    - Probably the most straightforward way is to try increasing the
>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>    - It might be helpful to share your configuration setups (e.g., the
>>>>    TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>>>    share the beginning part of your JM/TM logs, including the JVM parameters
>>>>    and all the loaded configurations.
>>>>    - You may want to look into the GC logs in addition to the metrics.
>>>>    In case of a CMS GC stop-the-world, you may not be able to see the most
>>>>    recent metrics due to the process not responding to the metric querying
>>>>    services.
>>>>    - You may also look into the status of the JM process. If JM is
>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>    message from TM is not timely handled before the timeout check.
>>>>    - Is there any metrics monitoring the network condition between the
>>>>    JM and timeouted TM? Possibly any jitters?
>>>>
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>
>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <or...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>> partitions and I have parallelism of 189.
>>>>>
>>>>> Currently running with RocksDB, with checkpointing disabled. My state
>>>>> size is appx. 500gb.
>>>>>
>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with
>>>>> no apparent reason.
>>>>>
>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>> load, total out-records, total in-records, backpressure, and everything I
>>>>> can think of. But all those metrics show that there's nothing unusual, and
>>>>> it has around average values for all those metrics. There are a lot of
>>>>> other containers which score higher.
>>>>>
>>>>> All the metrics are very low because every TaskManager runs on a
>>>>> r5.2xlarge machine alone.
>>>>>
>>>>> I'm trying to debug this for days and I cannot find any explanation
>>>>> for it.
>>>>>
>>>>> Can someone explain why it's happening?
>>>>>
>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
>>>>> id container_1593074931633_0011_01_000127 timed out.
>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>> JobMaster.java:1147)
>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>>> HeartbeatMonitorImpl.java:109)
>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>>>>> .java:511)
>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>>>>> AkkaRpcActor.java:397)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>>>> AkkaRpcActor.java:152)
>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:
>>>>> 123)
>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements
>>>>> .scala:21)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>>> 170)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>>> 171)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>>> 171)
>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:
>>>>> 260)
>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>> ForkJoinPool.java:1339)
>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool
>>>>> .java:1979)
>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>> ForkJoinWorkerThread.java:107)
>>>>>
>>>>> Thanks
>>>>>
>>>>