You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shailesh Jain <sh...@stellapps.com> on 2018/08/10 04:59:15 UTC

Re: Standalone cluster instability

Hi,

I hit a similar issue yesterday, the task manager died suspiciously, no
error logs in the task manager logs, but I see the following exceptions in
the job manager logs:

2018-08-05 18:03:28,322 ERROR
akka.remote.Remoting                                          - Association
to [akka.tcp://flink@localhost:34483] with UID [328996232] irrecoverably
failed. Quarantining address.
java.util.concurrent.TimeoutException: Remote system has been silent for
too long. (more than 48.0 hours)
        at
akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

but almost 3 days later it hit this:

2018-08-08 13:22:00,061 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from
state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
        at akka.actor.ActorCell.invoke(ActorCell.scala:494)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

followed by:

2018-08-08 13:22:20,090 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from
state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. Task to schedule: < Attempt #2 (Source: Custom Source ->
Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID <
fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup
[fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7,
8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler:
Number of instances=0, total number of slots=0, available slots=0
        at
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:263)
        at
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:142)
        at
org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$1(Execution.java:440)
        at
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
        at
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
        at
org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:438)
        at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:503)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:900)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:854)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1175)
        at
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
        at
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

There are no error logs in task manager, and following is the last memory
consumption log by task manager:

2018-08-08 13:19:23,341 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory
usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB
(used/committed/max)]
2018-08-08 13:19:23,341 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 115, Total Capacity: 38101792, Used Memory: 38101793
2018-08-08 13:19:23,341 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 52/55/240 MB (used/committed/max)], [Metaspace:
90/125/-1 MB (used/committed/max)], [Compressed Class Space: 11/17/1024 MB
(used/committed/max)]
2018-08-08 13:19:23,341 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 300736, GC COUNT:
6574], [G1 Old Generation, GC TIME (ms): 152, GC COUNT: 2]

So I think it rules out OOM as a cause for this crash.

Any ideas/leads to debug this would be really helpful. The cluster is
running on version 1.4.2.

Thanks,
Shailesh

On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov <
alexander.smirnoff@gmail.com> wrote:

> Hi Piotr,
>
> I didn't find anything special in the logs before the failure.
> Here are the logs, please take a look:
>
> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_
> x7QV59?usp=sharing
>
> The configuration is:
>
> 3 task managers:
> qafdsflinkw011.scl
> qafdsflinkw012.scl
> qafdsflinkw013.scl - lost connection
>
> 3 job  managers:
> qafdsflinkm011.scl - the leader
> qafdsflinkm012.scl
> qafdsflinkm013.scl
>
> 3 zookeepers:
> qafdsflinkzk011.scl
> qafdsflinkzk012.scl
> qafdsflinkzk013.scl
>
> Thank you,
> Alex
>
>
>
> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> Does the issue really happen after 48 hours?
>> Is there some indication of a failure in TaskManager log?
>>
>> If you will be still unable to solve the problem, please provide full
>> TaskManager and JobManager logs.
>>
>> Piotrek
>>
>> On 21 Mar 2018, at 16:00, Alexander Smirnov <al...@gmail.com>
>> wrote:
>>
>> One more question - I see a lot of line like the following in the logs
>>
>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,208] WARN Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:41068] with unknown UID is irrecoverably
>> failed. Address cannot be quarantined without knowing the UID, gating
>> instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,235] WARN Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:40677] with unknown UID is irrecoverably
>> failed. Address cannot be quarantined without knowing the UID, gating
>> instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:40382] with unknown UID is irrecoverably
>> failed. Address cannot be quarantined without knowing the UID, gating
>> instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:44744] with unknown UID is irrecoverably
>> failed. Address cannot be quarantined without knowing the UID, gating
>> instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,266] WARN Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:42413] with unknown UID is irrecoverably
>> failed. Address cannot be quarantined without knowing the UID, gating
>> instead for 5000 ms. (akka.remote.Remoting)
>>
>>
>> The host is available, but I don't understand where port number comes
>> from. Task Manager uses another port (which is printed in logs on startup)
>> Could you please help to understand why it happens?
>>
>> Thank you,
>> Alex
>>
>>
>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <
>> alexander.smirnoff@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I've assembled a standalone cluster of 3 task managers and 3 job
>>> managers(and 3 ZK) following the instructions at
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.4/ops/deployment/cluster_setup.html and https://ci.apache.org/
>>> projects/flink/flink-docs-release-1.4/ops/jobmanager_
>>> high_availability.html
>>>
>>> It works ok, but randomly, task managers becomes unavailable. JobManager
>>> has exception like below in logs:
>>>
>>>
>>> [2018-03-19 00:33:10,211] WARN Association with remote system
>>> [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413] has failed,
>>> address is now gated for [5000] ms. Reason: [Association failed with
>>> [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413]] Caused by:
>>> [Connection refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413]
>>> (akka.remote.ReliableDeliverySupervisor)
>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@
>>> qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
>>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>> for too long. (more than 48.0 hours)
>>>         at akka.remote.ReliableDeliverySupervisor$$
>>> anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>         at akka.remote.ReliableDeliverySupervisor.
>>> aroundReceive(Endpoint.scala:203)
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>>> ForkJoinTask.java:260)
>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>> runTask(ForkJoinPool.java:1339)
>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>> ForkJoinPool.java:1979)
>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>> ForkJoinWorkerThread.java:107)
>>>
>>> I can't find a reason for this exception, any ideas?
>>>
>>> Thank you,
>>> Alex
>>>
>>
>>

Re: ***UNCHECKED*** Re: Standalone cluster instability

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

JobManager is not responsible and have no means to restart TaskManager in case of TaskManager process being killed (it would need to have ssh into the machine and restart it…). I don’t know, but from your description of the problem I presume that Flink’s bash startup scripts do not contain a watchdog, that restarts the process in case of failure. In that case just google “bash watchdog" how to do it: for example https://stackoverflow.com/a/697064/8149051 <https://stackoverflow.com/a/697064/8149051>

Probably better way would be to use yarn or other resource manager. Flink’s JobManager would then redeploy/reschedule new TaskManager after a failure.

Piotrek

> On 19 Sep 2018, at 09:35, Shailesh Jain <sh...@stellapps.com> wrote:
> 
> Hi Piotrek,
> 
> We've hit the same issue again, kernel is repeatedly killing the task manager process (we've hit it 3 times in the past one week).
> We suspect we're hitting this bug in the kernel: https://bugs.launchpad.net/ubuntu/+source/linux/+bug/1655842 <https://bugs.launchpad.net/ubuntu/+source/linux/+bug/1655842>
> 
> One question I have is that why is the job manager not able to restart the task manager process when it discovers that it has been lost? It reports that there are no active task managers and available slots are 0. We're running on flink version 1.4.2.
> 
> I've attached the syslog and jobmanager log, the crash happened at Sep 18 23:31:14.
> 
> Thanks,
> Shailesh
> 
> On Thu, Aug 16, 2018 at 5:40 PM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> I’m not aware of such rules of thumb. Memory consumption is highly application and workload specific. It depends on how much things you allocate in your user code and how much memory do you keep on state (in case of heap state backend). Basically just as with most java applications, you have to use trial and error method.
> 
> One good practice is to before any deployment, test your Flink application on a testing cluster, that is identical to production cluster, by (re)processing some of the production workload/backlog/data (in parallel to production cluster).
> 
> Piotrek 
> 
>> On 16 Aug 2018, at 13:23, Shailesh Jain <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>> 
>> Thank you for your help Piotrek.
>> 
>> I think it was a combination of a. other processes taking up available memory and b. flink processes consuming all the memory allocated to them, that resulted in kernel running out of memory.
>> 
>> Are there any heuristics or best practices which you (or anyone in the community) recommend to benchmark memory requirements of a particular flink job?
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> On Tue, Aug 14, 2018 at 6:08 PM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Good that we are more or less on track with this problem :) But the problem here is not that heap size is too small, bot that your kernel is running out of memory and starts killing processes. Either:
>> 
>> 1. some other process is using the available memory 
>> 2. Increase memory allocation on your machine/virtual machine/container/cgroup
>> 3. Decrease the heap size of Flink’s JVM or non heap size (decrease network memory buffer pool). Of course for any given job/state size/configuration/cluster size there is some minimal reasonable memory size that you have to assign to Flink, otherwise you will have poor performance and/or constant garbage collections and/or you will start getting OOM errors from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on a different level).
>> 
>> Piotrek
>> 
>> 
>>> On 14 Aug 2018, at 07:36, Shailesh Jain <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> Thanks for your reply. I checked through the syslogs for that time, and I see this:
>>> 
>>> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill process 2305 (java) score 468 or sacrifice child
>>> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
>>> 
>>> As you pointed out, kernel killed the task manager process.
>>> 
>>> If I had already set the max heap size for the JVM (to 3GB in this case), and the memory usage stats showed 2329MB being used 90 seconds earlier, it seems a bit unlikely for operators to consume 700 MB heap space in that short time, because our events ingestion rate is not that high (close to 10 events per minute).
>>> 
>>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
>>> 
>>> Is it possible to log individual operator's memory consumption? This would help in narrowing down on the root cause. There were around 50 operators running (~8 kafka source/sink, ~8 Window operators, and the rest CEP operators).
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
>>> Hi,
>>> 
>>> Please post full TaskManager logs, including stderr and stdout. (Have you checked the stderr/stdout for some messages?)
>>> 
>>> I could think of couple reasons:
>>> 1. process segfault
>>> 2. process killed by OS
>>> 3. OS failure
>>> 
>>> 1. Should be visible by some message in stderr/stdout file and can be caused by for example JVM, RocksDB or some other native library/code bug. 
>>> 2. Is your system maybe running out of memory? Kernel might kill process if that’s happening. You can also check system (linux?) logs for errors that correlate in time. Where are those logs depend on your OS. 
>>> 3. This might be tricky, but I have seen kernel failures that prevented any messages from being logged for example. Besides this TaskManager failure is your machine operating normally without any other problems/crashes/restarts?
>>> 
>>> Piotrek
>>> 
>>>> On 10 Aug 2018, at 06:59, Shailesh Jain <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I hit a similar issue yesterday, the task manager died suspiciously, no error logs in the task manager logs, but I see the following exceptions in the job manager logs:
>>>> 
>>>> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting                                          - Association to [akka.tcp://flink@localhost:34483 <>] with UID [328996232] irrecoverably failed. Quarantining address.
>>>> java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
>>>>         at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>         at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> 
>>>> but almost 3 days later it hit this:
>>>> 
>>>> 2018-08-08 13:22:00,061 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from state RUNNING to FAILING.
>>>> java.lang.Exception: TaskManager was lost/killed: 5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
>>>>         at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>>         at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>         at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>>         at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>>         at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>>         at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>         at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>         at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>         at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>         at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>>>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> 
>>>> followed by:
>>>> 
>>>> 2018-08-08 13:22:20,090 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from state RUNNING to FAILING.
>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #2 (Source: Custom Source -> Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID < fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup [fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7, 8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler: Number of instances=0, total number of slots=0, available slots=0
>>>>         at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:263)
>>>>         at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:142)
>>>>         at org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$1(Execution.java:440)
>>>>         at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
>>>>         at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
>>>>         at org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:438)
>>>>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:503)
>>>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:900)
>>>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:854)
>>>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1175)
>>>>         at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
>>>>         at org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> 
>>>> There are no error logs in task manager, and following is the last memory consumption log by task manager:
>>>> 
>>>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
>>>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Direct memory stats: Count: 115, Total Capacity: 38101792, Used Memory: 38101793
>>>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap pool stats: [Code Cache: 52/55/240 MB (used/committed/max)], [Metaspace: 90/125/-1 MB (used/committed/max)], [Compressed Class Space: 11/17/1024 MB (used/committed/max)]
>>>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 300736, GC COUNT: 6574], [G1 Old Generation, GC TIME (ms): 152, GC COUNT: 2]
>>>> 
>>>> So I think it rules out OOM as a cause for this crash.
>>>> 
>>>> Any ideas/leads to debug this would be really helpful. The cluster is running on version 1.4.2.
>>>> 
>>>> Thanks,
>>>> Shailesh
>>>> 
>>>> On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>>>> Hi Piotr,
>>>> 
>>>> I didn't find anything special in the logs before the failure. 
>>>> Here are the logs, please take a look:
>>>> 
>>>> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing <https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing>
>>>> 
>>>> The configuration is:
>>>> 
>>>> 3 task managers:
>>>> qafdsflinkw011.scl 
>>>> qafdsflinkw012.scl 
>>>> qafdsflinkw013.scl - lost connection
>>>> 
>>>> 3 job  managers:
>>>> qafdsflinkm011.scl - the leader
>>>> qafdsflinkm012.scl 
>>>> qafdsflinkm013.scl 
>>>> 
>>>> 3 zookeepers:
>>>> qafdsflinkzk011.scl
>>>> qafdsflinkzk012.scl
>>>> qafdsflinkzk013.scl
>>>> 
>>>> Thank you,
>>>> Alex
>>>> 
>>>> 
>>>> 
>>>> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
>>>> Hi,
>>>> 
>>>> Does the issue really happen after 48 hours? 
>>>> Is there some indication of a failure in TaskManager log?
>>>> 
>>>> If you will be still unable to solve the problem, please provide full TaskManager and JobManager logs.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 21 Mar 2018, at 16:00, Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> One more question - I see a lot of line like the following in the logs
>>>>> 
>>>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:35320 <http://flink@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>>>> [2018-03-21 00:34:15,208] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:41068 <http://flink@qafdsflinkw811.nn.five9lab.com:41068/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>> [2018-03-21 00:34:15,235] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:40677 <http://flink@qafdsflinkw811.nn.five9lab.com:40677/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:40382 <http://flink@qafdsflinkw811.nn.five9lab.com:40382/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:44744 <http://flink@qafdsflinkw811.nn.five9lab.com:44744/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>> [2018-03-21 00:34:15,266] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>> 
>>>>> 
>>>>> The host is available, but I don't understand where port number comes from. Task Manager uses another port (which is printed in logs on startup)
>>>>> Could you please help to understand why it happens?
>>>>> 
>>>>> Thank you,
>>>>> Alex
>>>>> 
>>>>> 
>>>>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hello,
>>>>> 
>>>>> I've assembled a standalone cluster of 3 task managers and 3 job managers(and 3 ZK) following the instructions at 
>>>>> 
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html> and https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html>
>>>>> 
>>>>> It works ok, but randomly, task managers becomes unavailable. JobManager has exception like below in logs:
>>>>> 
>>>>> 
>>>>> [2018-03-19 00:33:10,211] WARN Association with remote system [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>]] Caused by: [Connection refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413 <http://qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413>] (akka.remote.ReliableDeliverySupervisor)
>>>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:35320 <http://flink@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>>>> java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
>>>>>         at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>         at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> 
>>>>> I can't find a reason for this exception, any ideas?
>>>>> 
>>>>> Thank you,
>>>>> Alex
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> <syslog.1><flink-smoketest-jobmanager-0-smoketest.com.log.bz2>


***UNCHECKED*** Re: Standalone cluster instability

Posted by Shailesh Jain <sh...@stellapps.com>.
Hi Piotrek,

We've hit the same issue again, kernel is repeatedly killing the task
manager process (we've hit it 3 times in the past one week).
We suspect we're hitting this bug in the kernel:
https://bugs.launchpad.net/ubuntu/+source/linux/+bug/1655842

One question I have is that why is the job manager not able to restart the
task manager process when it discovers that it has been lost? It reports
that there are no active task managers and available slots are 0. We're
running on flink version 1.4.2.

I've attached the syslog and jobmanager log, the crash happened at Sep 18
23:31:14.

Thanks,
Shailesh

On Thu, Aug 16, 2018 at 5:40 PM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> I’m not aware of such rules of thumb. Memory consumption is highly
> application and workload specific. It depends on how much things you
> allocate in your user code and how much memory do you keep on state (in
> case of heap state backend). Basically just as with most java applications,
> you have to use trial and error method.
>
> One good practice is to before any deployment, test your Flink application
> on a testing cluster, that is identical to production cluster, by
> (re)processing some of the production workload/backlog/data (in parallel to
> production cluster).
>
> Piotrek
>
> On 16 Aug 2018, at 13:23, Shailesh Jain <sh...@stellapps.com>
> wrote:
>
> Thank you for your help Piotrek.
>
> I think it was a combination of a. other processes taking up available
> memory and b. flink processes consuming all the memory allocated to them,
> that resulted in kernel running out of memory.
>
> Are there any heuristics or best practices which you (or anyone in the
> community) recommend to benchmark memory requirements of a particular flink
> job?
>
> Thanks,
> Shailesh
>
>
> On Tue, Aug 14, 2018 at 6:08 PM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> Good that we are more or less on track with this problem :) But the
>> problem here is not that heap size is too small, bot that your kernel is
>> running out of memory and starts killing processes. Either:
>>
>> 1. some other process is using the available memory
>> 2. Increase memory allocation on your machine/virtual
>> machine/container/cgroup
>> 3. Decrease the heap size of Flink’s JVM or non heap size (decrease
>> network memory buffer pool). Of course for any given job/state
>> size/configuration/cluster size there is some minimal reasonable memory
>> size that you have to assign to Flink, otherwise you will have poor
>> performance and/or constant garbage collections and/or you will start
>> getting OOM errors from JVM (don’t confuse those with OS/kernel's OOM
>> errors - those two are on a different level).
>>
>> Piotrek
>>
>>
>> On 14 Aug 2018, at 07:36, Shailesh Jain <sh...@stellapps.com>
>> wrote:
>>
>> Hi Piotrek,
>>
>> Thanks for your reply. I checked through the syslogs for that time, and I
>> see this:
>>
>> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill
>> process 2305 (java) score 468 or sacrifice child
>> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305
>> (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
>>
>> As you pointed out, kernel killed the task manager process.
>>
>> If I had already set the max heap size for the JVM (to 3GB in this case),
>> and the memory usage stats showed 2329MB being used 90 seconds earlier, it
>> seems a bit unlikely for operators to consume 700 MB heap space in that
>> short time, because our events ingestion rate is not that high (close to 10
>> events per minute).
>>
>> 2018-08-08 13:19:23,341 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - Memory
>> usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB
>> (used/committed/max)]
>>
>> Is it possible to log individual operator's memory consumption? This
>> would help in narrowing down on the root cause. There were around 50
>> operators running (~8 kafka source/sink, ~8 Window operators, and the rest
>> CEP operators).
>>
>> Thanks,
>> Shailesh
>>
>> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Please post full TaskManager logs, including stderr and stdout. (Have
>>> you checked the stderr/stdout for some messages?)
>>>
>>> I could think of couple reasons:
>>> 1. process segfault
>>> 2. process killed by OS
>>> 3. OS failure
>>>
>>> 1. Should be visible by some message in stderr/stdout file and can be
>>> caused by for example JVM, RocksDB or some other native library/code bug.
>>> 2. Is your system maybe running out of memory? Kernel might kill process
>>> if that’s happening. You can also check system (linux?) logs for errors
>>> that correlate in time. Where are those logs depend on your OS.
>>> 3. This might be tricky, but I have seen kernel failures that prevented
>>> any messages from being logged for example. Besides this TaskManager
>>> failure is your machine operating normally without any other
>>> problems/crashes/restarts?
>>>
>>> Piotrek
>>>
>>> On 10 Aug 2018, at 06:59, Shailesh Jain <sh...@stellapps.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I hit a similar issue yesterday, the task manager died suspiciously, no
>>> error logs in the task manager logs, but I see the following exceptions in
>>> the job manager logs:
>>>
>>> 2018-08-05 18:03:28,322 ERROR
>>> akka.remote.Remoting                                          - Association
>>> to [akka.tcp://flink@localhost:34483] with UID [328996232]
>>> irrecoverably failed. Quarantining address.
>>> java.util.concurrent.TimeoutException: Remote system has been silent for
>>> too long. (more than 48.0 hours)
>>>         at
>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>         at
>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> but almost 3 days later it hit this:
>>>
>>> 2018-08-08 13:22:00,061 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>> Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from
>>> state RUNNING to FAILING.
>>> java.lang.Exception: TaskManager was lost/killed:
>>> 5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
>>>         at
>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>         at
>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>         at
>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>         at
>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>         at
>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>         at org.apache.flink.runtime.jobmanager.JobManager.org
>>> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>         at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>         at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>         at
>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>         at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>         at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>         at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>         at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>         at
>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>         at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>         at
>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>         at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> followed by:
>>>
>>> 2018-08-08 13:22:20,090 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>> Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from
>>> state RUNNING to FAILING.
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Not enough free slots available to run the job. You can decrease the
>>> operator parallelism or increase the number of slots per TaskManager in the
>>> configuration. Task to schedule: < Attempt #2 (Source: Custom Source ->
>>> Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID <
>>> fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup
>>> [fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7,
>>> 8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler:
>>> Number of instances=0, total number of slots=0, available slots=0
>>>         at
>>> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:263)
>>>         at
>>> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:142)
>>>         at
>>> org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$1(Execution.java:440)
>>>         at
>>> java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
>>>         at
>>> java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
>>>         at
>>> org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:438)
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:503)
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:900)
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:854)
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1175)
>>>         at
>>> org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
>>>         at
>>> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>         at java.lang.Thread.run(Thread.java:748)
>>>
>>> There are no error logs in task manager, and following is the last
>>> memory consumption log by task manager:
>>>
>>> 2018-08-08 13:19:23,341 INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager              - Memory
>>> usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB
>>> (used/committed/max)]
>>> 2018-08-08 13:19:23,341 INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager              - Direct
>>> memory stats: Count: 115, Total Capacity: 38101792, Used Memory: 38101793
>>> 2018-08-08 13:19:23,341 INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
>>> pool stats: [Code Cache: 52/55/240 MB (used/committed/max)], [Metaspace:
>>> 90/125/-1 MB (used/committed/max)], [Compressed Class Space: 11/17/1024 MB
>>> (used/committed/max)]
>>> 2018-08-08 13:19:23,341 INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
>>> collector stats: [G1 Young Generation, GC TIME (ms): 300736, GC COUNT:
>>> 6574], [G1 Old Generation, GC TIME (ms): 152, GC COUNT: 2]
>>>
>>> So I think it rules out OOM as a cause for this crash.
>>>
>>> Any ideas/leads to debug this would be really helpful. The cluster is
>>> running on version 1.4.2.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>> On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov <
>>> alexander.smirnoff@gmail.com> wrote:
>>>
>>>> Hi Piotr,
>>>>
>>>> I didn't find anything special in the logs before the failure.
>>>> Here are the logs, please take a look:
>>>>
>>>>
>>>> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing
>>>>
>>>> The configuration is:
>>>>
>>>> 3 task managers:
>>>> qafdsflinkw011.scl
>>>> qafdsflinkw012.scl
>>>> qafdsflinkw013.scl - lost connection
>>>>
>>>> 3 job  managers:
>>>> qafdsflinkm011.scl - the leader
>>>> qafdsflinkm012.scl
>>>> qafdsflinkm013.scl
>>>>
>>>> 3 zookeepers:
>>>> qafdsflinkzk011.scl
>>>> qafdsflinkzk012.scl
>>>> qafdsflinkzk013.scl
>>>>
>>>> Thank you,
>>>> Alex
>>>>
>>>>
>>>>
>>>> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <pi...@data-artisans.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Does the issue really happen after 48 hours?
>>>>> Is there some indication of a failure in TaskManager log?
>>>>>
>>>>> If you will be still unable to solve the problem, please provide full
>>>>> TaskManager and JobManager logs.
>>>>>
>>>>> Piotrek
>>>>>
>>>>> On 21 Mar 2018, at 16:00, Alexander Smirnov <
>>>>> alexander.smirnoff@gmail.com> wrote:
>>>>>
>>>>> One more question - I see a lot of line like the following in the logs
>>>>>
>>>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://
>>>>> flink@qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
>>>>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>>>> [2018-03-21 00:34:15,208] WARN Association to [akka.tcp://
>>>>> flink@qafdsflinkw811.nn.five9lab.com:41068] with unknown UID is
>>>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>> [2018-03-21 00:34:15,235] WARN Association to [akka.tcp://
>>>>> flink@qafdsflinkw811.nn.five9lab.com:40677] with unknown UID is
>>>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://
>>>>> flink@qafdsflinkw811.nn.five9lab.com:40382] with unknown UID is
>>>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://
>>>>> flink@qafdsflinkw811.nn.five9lab.com:44744] with unknown UID is
>>>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>> [2018-03-21 00:34:15,266] WARN Association to [akka.tcp://
>>>>> flink@qafdsflinkw811.nn.five9lab.com:42413] with unknown UID is
>>>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>>
>>>>>
>>>>> The host is available, but I don't understand where port number comes
>>>>> from. Task Manager uses another port (which is printed in logs on startup)
>>>>> Could you please help to understand why it happens?
>>>>>
>>>>> Thank you,
>>>>> Alex
>>>>>
>>>>>
>>>>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <
>>>>> alexander.smirnoff@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I've assembled a standalone cluster of 3 task managers and 3 job
>>>>>> managers(and 3 ZK) following the instructions at
>>>>>>
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html and
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html
>>>>>>
>>>>>> It works ok, but randomly, task managers becomes unavailable.
>>>>>> JobManager has exception like below in logs:
>>>>>>
>>>>>>
>>>>>> [2018-03-19 00:33:10,211] WARN Association with remote system
>>>>>> [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413] has failed,
>>>>>> address is now gated for [5000] ms. Reason: [Association failed with
>>>>>> [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413]] Caused by:
>>>>>> [Connection refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413]
>>>>>> (akka.remote.ReliableDeliverySupervisor)
>>>>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://
>>>>>> flink@qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
>>>>>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>>>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>>>>> for too long. (more than 48.0 hours)
>>>>>>         at
>>>>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>         at
>>>>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>         at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>         at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>         at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>         at
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>
>>>>>> I can't find a reason for this exception, any ideas?
>>>>>>
>>>>>> Thank you,
>>>>>> Alex
>>>>>>
>>>>>
>>>>>
>>>
>>>
>>
>>
>
>

Re: Standalone cluster instability

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

I’m not aware of such rules of thumb. Memory consumption is highly application and workload specific. It depends on how much things you allocate in your user code and how much memory do you keep on state (in case of heap state backend). Basically just as with most java applications, you have to use trial and error method.

One good practice is to before any deployment, test your Flink application on a testing cluster, that is identical to production cluster, by (re)processing some of the production workload/backlog/data (in parallel to production cluster).

Piotrek 

> On 16 Aug 2018, at 13:23, Shailesh Jain <sh...@stellapps.com> wrote:
> 
> Thank you for your help Piotrek.
> 
> I think it was a combination of a. other processes taking up available memory and b. flink processes consuming all the memory allocated to them, that resulted in kernel running out of memory.
> 
> Are there any heuristics or best practices which you (or anyone in the community) recommend to benchmark memory requirements of a particular flink job?
> 
> Thanks,
> Shailesh
> 
> 
> On Tue, Aug 14, 2018 at 6:08 PM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> Good that we are more or less on track with this problem :) But the problem here is not that heap size is too small, bot that your kernel is running out of memory and starts killing processes. Either:
> 
> 1. some other process is using the available memory 
> 2. Increase memory allocation on your machine/virtual machine/container/cgroup
> 3. Decrease the heap size of Flink’s JVM or non heap size (decrease network memory buffer pool). Of course for any given job/state size/configuration/cluster size there is some minimal reasonable memory size that you have to assign to Flink, otherwise you will have poor performance and/or constant garbage collections and/or you will start getting OOM errors from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on a different level).
> 
> Piotrek
> 
> 
>> On 14 Aug 2018, at 07:36, Shailesh Jain <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>> 
>> Hi Piotrek,
>> 
>> Thanks for your reply. I checked through the syslogs for that time, and I see this:
>> 
>> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill process 2305 (java) score 468 or sacrifice child
>> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
>> 
>> As you pointed out, kernel killed the task manager process.
>> 
>> If I had already set the max heap size for the JVM (to 3GB in this case), and the memory usage stats showed 2329MB being used 90 seconds earlier, it seems a bit unlikely for operators to consume 700 MB heap space in that short time, because our events ingestion rate is not that high (close to 10 events per minute).
>> 
>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
>> 
>> Is it possible to log individual operator's memory consumption? This would help in narrowing down on the root cause. There were around 50 operators running (~8 kafka source/sink, ~8 Window operators, and the rest CEP operators).
>> 
>> Thanks,
>> Shailesh
>> 
>> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Please post full TaskManager logs, including stderr and stdout. (Have you checked the stderr/stdout for some messages?)
>> 
>> I could think of couple reasons:
>> 1. process segfault
>> 2. process killed by OS
>> 3. OS failure
>> 
>> 1. Should be visible by some message in stderr/stdout file and can be caused by for example JVM, RocksDB or some other native library/code bug. 
>> 2. Is your system maybe running out of memory? Kernel might kill process if that’s happening. You can also check system (linux?) logs for errors that correlate in time. Where are those logs depend on your OS. 
>> 3. This might be tricky, but I have seen kernel failures that prevented any messages from being logged for example. Besides this TaskManager failure is your machine operating normally without any other problems/crashes/restarts?
>> 
>> Piotrek
>> 
>>> On 10 Aug 2018, at 06:59, Shailesh Jain <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I hit a similar issue yesterday, the task manager died suspiciously, no error logs in the task manager logs, but I see the following exceptions in the job manager logs:
>>> 
>>> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting                                          - Association to [akka.tcp://flink@localhost:34483 <>] with UID [328996232] irrecoverably failed. Quarantining address.
>>> java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
>>>         at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>         at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> 
>>> but almost 3 days later it hit this:
>>> 
>>> 2018-08-08 13:22:00,061 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from state RUNNING to FAILING.
>>> java.lang.Exception: TaskManager was lost/killed: 5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
>>>         at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>         at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>         at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>         at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>         at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>         at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>         at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>         at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>         at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>         at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> 
>>> followed by:
>>> 
>>> 2018-08-08 13:22:20,090 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from state RUNNING to FAILING.
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #2 (Source: Custom Source -> Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID < fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup [fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7, 8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler: Number of instances=0, total number of slots=0, available slots=0
>>>         at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:263)
>>>         at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:142)
>>>         at org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$1(Execution.java:440)
>>>         at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
>>>         at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
>>>         at org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:438)
>>>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:503)
>>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:900)
>>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:854)
>>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1175)
>>>         at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
>>>         at org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> 
>>> There are no error logs in task manager, and following is the last memory consumption log by task manager:
>>> 
>>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
>>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Direct memory stats: Count: 115, Total Capacity: 38101792, Used Memory: 38101793
>>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap pool stats: [Code Cache: 52/55/240 MB (used/committed/max)], [Metaspace: 90/125/-1 MB (used/committed/max)], [Compressed Class Space: 11/17/1024 MB (used/committed/max)]
>>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 300736, GC COUNT: 6574], [G1 Old Generation, GC TIME (ms): 152, GC COUNT: 2]
>>> 
>>> So I think it rules out OOM as a cause for this crash.
>>> 
>>> Any ideas/leads to debug this would be really helpful. The cluster is running on version 1.4.2.
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>>> Hi Piotr,
>>> 
>>> I didn't find anything special in the logs before the failure. 
>>> Here are the logs, please take a look:
>>> 
>>> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing <https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing>
>>> 
>>> The configuration is:
>>> 
>>> 3 task managers:
>>> qafdsflinkw011.scl 
>>> qafdsflinkw012.scl 
>>> qafdsflinkw013.scl - lost connection
>>> 
>>> 3 job  managers:
>>> qafdsflinkm011.scl - the leader
>>> qafdsflinkm012.scl 
>>> qafdsflinkm013.scl 
>>> 
>>> 3 zookeepers:
>>> qafdsflinkzk011.scl
>>> qafdsflinkzk012.scl
>>> qafdsflinkzk013.scl
>>> 
>>> Thank you,
>>> Alex
>>> 
>>> 
>>> 
>>> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
>>> Hi,
>>> 
>>> Does the issue really happen after 48 hours? 
>>> Is there some indication of a failure in TaskManager log?
>>> 
>>> If you will be still unable to solve the problem, please provide full TaskManager and JobManager logs.
>>> 
>>> Piotrek
>>> 
>>>> On 21 Mar 2018, at 16:00, Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> One more question - I see a lot of line like the following in the logs
>>>> 
>>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:35320 <http://flink@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>>> [2018-03-21 00:34:15,208] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:41068 <http://flink@qafdsflinkw811.nn.five9lab.com:41068/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>> [2018-03-21 00:34:15,235] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:40677 <http://flink@qafdsflinkw811.nn.five9lab.com:40677/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:40382 <http://flink@qafdsflinkw811.nn.five9lab.com:40382/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:44744 <http://flink@qafdsflinkw811.nn.five9lab.com:44744/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>> [2018-03-21 00:34:15,266] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>> 
>>>> 
>>>> The host is available, but I don't understand where port number comes from. Task Manager uses another port (which is printed in logs on startup)
>>>> Could you please help to understand why it happens?
>>>> 
>>>> Thank you,
>>>> Alex
>>>> 
>>>> 
>>>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>>>> Hello,
>>>> 
>>>> I've assembled a standalone cluster of 3 task managers and 3 job managers(and 3 ZK) following the instructions at 
>>>> 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html> and https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html>
>>>> 
>>>> It works ok, but randomly, task managers becomes unavailable. JobManager has exception like below in logs:
>>>> 
>>>> 
>>>> [2018-03-19 00:33:10,211] WARN Association with remote system [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>]] Caused by: [Connection refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413 <http://qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413>] (akka.remote.ReliableDeliverySupervisor)
>>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:35320 <http://flink@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>>> java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
>>>>         at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>         at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> 
>>>> I can't find a reason for this exception, any ideas?
>>>> 
>>>> Thank you,
>>>> Alex
>>> 
>>> 
>> 
>> 
> 
> 


Re: Standalone cluster instability

Posted by Shailesh Jain <sh...@stellapps.com>.
Thank you for your help Piotrek.

I think it was a combination of a. other processes taking up available
memory and b. flink processes consuming all the memory allocated to them,
that resulted in kernel running out of memory.

Are there any heuristics or best practices which you (or anyone in the
community) recommend to benchmark memory requirements of a particular flink
job?

Thanks,
Shailesh


On Tue, Aug 14, 2018 at 6:08 PM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Good that we are more or less on track with this problem :) But the
> problem here is not that heap size is too small, bot that your kernel is
> running out of memory and starts killing processes. Either:
>
> 1. some other process is using the available memory
> 2. Increase memory allocation on your machine/virtual
> machine/container/cgroup
> 3. Decrease the heap size of Flink’s JVM or non heap size (decrease
> network memory buffer pool). Of course for any given job/state
> size/configuration/cluster size there is some minimal reasonable memory
> size that you have to assign to Flink, otherwise you will have poor
> performance and/or constant garbage collections and/or you will start
> getting OOM errors from JVM (don’t confuse those with OS/kernel's OOM
> errors - those two are on a different level).
>
> Piotrek
>
>
> On 14 Aug 2018, at 07:36, Shailesh Jain <sh...@stellapps.com>
> wrote:
>
> Hi Piotrek,
>
> Thanks for your reply. I checked through the syslogs for that time, and I
> see this:
>
> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill
> process 2305 (java) score 468 or sacrifice child
> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305
> (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
>
> As you pointed out, kernel killed the task manager process.
>
> If I had already set the max heap size for the JVM (to 3GB in this case),
> and the memory usage stats showed 2329MB being used 90 seconds earlier, it
> seems a bit unlikely for operators to consume 700 MB heap space in that
> short time, because our events ingestion rate is not that high (close to 10
> events per minute).
>
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager              - Memory usage stats: [HEAP:
> 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
>
> Is it possible to log individual operator's memory consumption? This would
> help in narrowing down on the root cause. There were around 50 operators
> running (~8 kafka source/sink, ~8 Window operators, and the rest CEP
> operators).
>
> Thanks,
> Shailesh
>
> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> Please post full TaskManager logs, including stderr and stdout. (Have you
>> checked the stderr/stdout for some messages?)
>>
>> I could think of couple reasons:
>> 1. process segfault
>> 2. process killed by OS
>> 3. OS failure
>>
>> 1. Should be visible by some message in stderr/stdout file and can be
>> caused by for example JVM, RocksDB or some other native library/code bug.
>> 2. Is your system maybe running out of memory? Kernel might kill process
>> if that’s happening. You can also check system (linux?) logs for errors
>> that correlate in time. Where are those logs depend on your OS.
>> 3. This might be tricky, but I have seen kernel failures that prevented
>> any messages from being logged for example. Besides this TaskManager
>> failure is your machine operating normally without any other
>> problems/crashes/restarts?
>>
>> Piotrek
>>
>> On 10 Aug 2018, at 06:59, Shailesh Jain <sh...@stellapps.com>
>> wrote:
>>
>> Hi,
>>
>> I hit a similar issue yesterday, the task manager died suspiciously, no
>> error logs in the task manager logs, but I see the following exceptions in
>> the job manager logs:
>>
>> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting
>>                                 - Association to [
>> akka.tcp://flink@localhost:34483] with UID [328996232] irrecoverably
>> failed. Quarantining address.
>> java.util.concurrent.TimeoutException: Remote system has been silent for
>> too long. (more than 48.0 hours)
>>         at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.
>> applyOrElse(Endpoint.scala:375)
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>         at akka.remote.ReliableDeliverySupervisor.aroundReceive(
>> Endpoint.scala:203)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>> java:260)
>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>>
>> but almost 3 days later it hit this:
>>
>> 2018-08-08 13:22:00,061 INFO  org.apache.flink.runtime.execu
>> tiongraph.ExecutionGraph        - Job Internal state machine job
>> (1057c13d169dae609466210174e2cc8b) switched from state RUNNING to
>> FAILING.
>> java.lang.Exception: TaskManager was lost/killed:
>> 5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
>>         at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(
>> SimpleSlot.java:217)
>>         at org.apache.flink.runtime.instance.SlotSharingGroupAssignment
>> .releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>         at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(
>> SharedSlot.java:192)
>>         at org.apache.flink.runtime.instance.Instance.markDead(Instance
>> .java:167)
>>         at org.apache.flink.runtime.instance.InstanceManager.unregister
>> TaskManager(InstanceManager.java:212)
>>         at org.apache.flink.runtime.jobmanager.JobManager.org
>> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$
>> flink$runtime$jobmanager$JobManager$$handleTaskManagerT
>> erminated(JobManager.scala:1198)
>>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1.applyOrElse(JobManager.scala:1096)
>>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>         at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun
>> $receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:33)
>>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:28)
>>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scal
>> a:123)
>>         at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(
>> LogMessages.scala:28)
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>         at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive
>> (JobManager.scala:122)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>         at akka.actor.dungeon.DeathWatch$class.receivedTerminated(Death
>> Watch.scala:46)
>>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>> java:260)
>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>>
>> followed by:
>>
>> 2018-08-08 13:22:20,090 INFO  org.apache.flink.runtime.execu
>> tiongraph.ExecutionGraph        - Job Internal state machine job
>> (1057c13d169dae609466210174e2cc8b) switched from state RUNNING to
>> FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Not enough free slots available to run the job. You can decrease the
>> operator parallelism or increase the number of slots per TaskManager in the
>> configuration. Task to schedule: < Attempt #2 (Source: Custom Source ->
>> Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID <
>> fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup
>> [fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7,
>> 8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler:
>> Number of instances=0, total number of slots=0, available slots=0
>>         at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.
>> scheduleTask(Scheduler.java:263)
>>         at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.
>> allocateSlot(Scheduler.java:142)
>>         at org.apache.flink.runtime.executiongraph.Execution.lambda$all
>> ocateAndAssignSlotForExecution$1(Execution.java:440)
>>         at java.util.concurrent.CompletableFuture.uniComposeStage(Compl
>> etableFuture.java:981)
>>         at java.util.concurrent.CompletableFuture.thenCompose(Completab
>> leFuture.java:2124)
>>         at org.apache.flink.runtime.executiongraph.Execution.allocateAn
>> dAssignSlotForExecution(Execution.java:438)
>>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.a
>> llocateResourcesForAll(ExecutionJobVertex.java:503)
>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.sched
>> uleEager(ExecutionGraph.java:900)
>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.sched
>> uleForExecution(ExecutionGraph.java:854)
>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.resta
>> rt(ExecutionGraph.java:1175)
>>         at org.apache.flink.runtime.executiongraph.restart.ExecutionGra
>> phRestartCallback.triggerFullRecovery(ExecutionGraphRestartC
>> allback.java:59)
>>         at org.apache.flink.runtime.executiongraph.restart.FixedDelayRe
>> startStrategy$1.run(FixedDelayRestartStrategy.java:68)
>>         at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
>> tureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu
>> tureTask.run(ScheduledThreadPoolExecutor.java:293)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1149)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> There are no error logs in task manager, and following is the last memory
>> consumption log by task manager:
>>
>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskm
>> anager.TaskManager              - Memory usage stats: [HEAP:
>> 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskm
>> anager.TaskManager              - Direct memory stats: Count: 115, Total
>> Capacity: 38101792, Used Memory: 38101793
>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskm
>> anager.TaskManager              - Off-heap pool stats: [Code Cache:
>> 52/55/240 MB (used/committed/max)], [Metaspace: 90/125/-1 MB
>> (used/committed/max)], [Compressed Class Space: 11/17/1024 MB
>> (used/committed/max)]
>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskm
>> anager.TaskManager              - Garbage collector stats: [G1 Young
>> Generation, GC TIME (ms): 300736, GC COUNT: 6574], [G1 Old Generation, GC
>> TIME (ms): 152, GC COUNT: 2]
>>
>> So I think it rules out OOM as a cause for this crash.
>>
>> Any ideas/leads to debug this would be really helpful. The cluster is
>> running on version 1.4.2.
>>
>> Thanks,
>> Shailesh
>>
>> On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov <
>> alexander.smirnoff@gmail.com> wrote:
>>
>>> Hi Piotr,
>>>
>>> I didn't find anything special in the logs before the failure.
>>> Here are the logs, please take a look:
>>>
>>> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lU
>>> X-bkn_x7QV59?usp=sharing
>>>
>>> The configuration is:
>>>
>>> 3 task managers:
>>> qafdsflinkw011.scl
>>> qafdsflinkw012.scl
>>> qafdsflinkw013.scl - lost connection
>>>
>>> 3 job  managers:
>>> qafdsflinkm011.scl - the leader
>>> qafdsflinkm012.scl
>>> qafdsflinkm013.scl
>>>
>>> 3 zookeepers:
>>> qafdsflinkzk011.scl
>>> qafdsflinkzk012.scl
>>> qafdsflinkzk013.scl
>>>
>>> Thank you,
>>> Alex
>>>
>>>
>>>
>>> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Does the issue really happen after 48 hours?
>>>> Is there some indication of a failure in TaskManager log?
>>>>
>>>> If you will be still unable to solve the problem, please provide full
>>>> TaskManager and JobManager logs.
>>>>
>>>> Piotrek
>>>>
>>>> On 21 Mar 2018, at 16:00, Alexander Smirnov <
>>>> alexander.smirnoff@gmail.com> wrote:
>>>>
>>>> One more question - I see a lot of line like the following in the logs
>>>>
>>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://
>>>> flink@qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
>>>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>>> [2018-03-21 00:34:15,208] WARN Association to [akka.tcp://
>>>> flink@qafdsflinkw811.nn.five9lab.com:41068] with unknown UID is
>>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>> [2018-03-21 00:34:15,235] WARN Association to [akka.tcp://
>>>> flink@qafdsflinkw811.nn.five9lab.com:40677] with unknown UID is
>>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://
>>>> flink@qafdsflinkw811.nn.five9lab.com:40382] with unknown UID is
>>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://
>>>> flink@qafdsflinkw811.nn.five9lab.com:44744] with unknown UID is
>>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>> [2018-03-21 00:34:15,266] WARN Association to [akka.tcp://
>>>> flink@qafdsflinkw811.nn.five9lab.com:42413] with unknown UID is
>>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>>
>>>>
>>>> The host is available, but I don't understand where port number comes
>>>> from. Task Manager uses another port (which is printed in logs on startup)
>>>> Could you please help to understand why it happens?
>>>>
>>>> Thank you,
>>>> Alex
>>>>
>>>>
>>>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <
>>>> alexander.smirnoff@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I've assembled a standalone cluster of 3 task managers and 3 job
>>>>> managers(and 3 ZK) following the instructions at
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>>> ops/deployment/cluster_setup.html and https://ci.apache.org/projects
>>>>> /flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html
>>>>>
>>>>> It works ok, but randomly, task managers becomes unavailable.
>>>>> JobManager has exception like below in logs:
>>>>>
>>>>>
>>>>> [2018-03-19 00:33:10,211] WARN Association with remote system
>>>>> [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413] has failed,
>>>>> address is now gated for [5000] ms. Reason: [Association failed with
>>>>> [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413]] Caused by:
>>>>> [Connection refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413]
>>>>> (akka.remote.ReliableDeliverySupervisor)
>>>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://
>>>>> flink@qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
>>>>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>>>> for too long. (more than 48.0 hours)
>>>>>         at akka.remote.ReliableDeliverySu
>>>>> pervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>         at akka.remote.ReliableDeliverySu
>>>>> pervisor.aroundReceive(Endpoint.scala:203)
>>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>         at scala.concurrent.forkjoin.Fork
>>>>> JoinTask.doExec(ForkJoinTask.java:260)
>>>>>         at scala.concurrent.forkjoin.Fork
>>>>> JoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>         at scala.concurrent.forkjoin.Fork
>>>>> JoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>         at scala.concurrent.forkjoin.Fork
>>>>> JoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>
>>>>> I can't find a reason for this exception, any ideas?
>>>>>
>>>>> Thank you,
>>>>> Alex
>>>>>
>>>>
>>>>
>>
>>
>
>

Re: Standalone cluster instability

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Good that we are more or less on track with this problem :) But the problem here is not that heap size is too small, bot that your kernel is running out of memory and starts killing processes. Either:

1. some other process is using the available memory 
2. Increase memory allocation on your machine/virtual machine/container/cgroup
3. Decrease the heap size of Flink’s JVM or non heap size (decrease network memory buffer pool). Of course for any given job/state size/configuration/cluster size there is some minimal reasonable memory size that you have to assign to Flink, otherwise you will have poor performance and/or constant garbage collections and/or you will start getting OOM errors from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on a different level).

Piotrek

> On 14 Aug 2018, at 07:36, Shailesh Jain <sh...@stellapps.com> wrote:
> 
> Hi Piotrek,
> 
> Thanks for your reply. I checked through the syslogs for that time, and I see this:
> 
> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill process 2305 (java) score 468 or sacrifice child
> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
> 
> As you pointed out, kernel killed the task manager process.
> 
> If I had already set the max heap size for the JVM (to 3GB in this case), and the memory usage stats showed 2329MB being used 90 seconds earlier, it seems a bit unlikely for operators to consume 700 MB heap space in that short time, because our events ingestion rate is not that high (close to 10 events per minute).
> 
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
> 
> Is it possible to log individual operator's memory consumption? This would help in narrowing down on the root cause. There were around 50 operators running (~8 kafka source/sink, ~8 Window operators, and the rest CEP operators).
> 
> Thanks,
> Shailesh
> 
> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> Please post full TaskManager logs, including stderr and stdout. (Have you checked the stderr/stdout for some messages?)
> 
> I could think of couple reasons:
> 1. process segfault
> 2. process killed by OS
> 3. OS failure
> 
> 1. Should be visible by some message in stderr/stdout file and can be caused by for example JVM, RocksDB or some other native library/code bug. 
> 2. Is your system maybe running out of memory? Kernel might kill process if that’s happening. You can also check system (linux?) logs for errors that correlate in time. Where are those logs depend on your OS. 
> 3. This might be tricky, but I have seen kernel failures that prevented any messages from being logged for example. Besides this TaskManager failure is your machine operating normally without any other problems/crashes/restarts?
> 
> Piotrek
> 
>> On 10 Aug 2018, at 06:59, Shailesh Jain <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>> 
>> Hi,
>> 
>> I hit a similar issue yesterday, the task manager died suspiciously, no error logs in the task manager logs, but I see the following exceptions in the job manager logs:
>> 
>> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting                                          - Association to [akka.tcp://flink@localhost:34483 <>] with UID [328996232] irrecoverably failed. Quarantining address.
>> java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
>>         at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>         at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 
>> but almost 3 days later it hit this:
>> 
>> 2018-08-08 13:22:00,061 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from state RUNNING to FAILING.
>> java.lang.Exception: TaskManager was lost/killed: 5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
>>         at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>         at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>         at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>         at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>         at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>         at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>         at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>         at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>         at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>         at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 
>> followed by:
>> 
>> 2018-08-08 13:22:20,090 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from state RUNNING to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #2 (Source: Custom Source -> Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID < fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup [fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7, 8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler: Number of instances=0, total number of slots=0, available slots=0
>>         at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:263)
>>         at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:142)
>>         at org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$1(Execution.java:440)
>>         at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
>>         at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
>>         at org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:438)
>>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:503)
>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:900)
>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:854)
>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1175)
>>         at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
>>         at org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>> 
>> There are no error logs in task manager, and following is the last memory consumption log by task manager:
>> 
>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Direct memory stats: Count: 115, Total Capacity: 38101792, Used Memory: 38101793
>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap pool stats: [Code Cache: 52/55/240 MB (used/committed/max)], [Metaspace: 90/125/-1 MB (used/committed/max)], [Compressed Class Space: 11/17/1024 MB (used/committed/max)]
>> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 300736, GC COUNT: 6574], [G1 Old Generation, GC TIME (ms): 152, GC COUNT: 2]
>> 
>> So I think it rules out OOM as a cause for this crash.
>> 
>> Any ideas/leads to debug this would be really helpful. The cluster is running on version 1.4.2.
>> 
>> Thanks,
>> Shailesh
>> 
>> On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>> Hi Piotr,
>> 
>> I didn't find anything special in the logs before the failure. 
>> Here are the logs, please take a look:
>> 
>> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing <https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing>
>> 
>> The configuration is:
>> 
>> 3 task managers:
>> qafdsflinkw011.scl 
>> qafdsflinkw012.scl 
>> qafdsflinkw013.scl - lost connection
>> 
>> 3 job  managers:
>> qafdsflinkm011.scl - the leader
>> qafdsflinkm012.scl 
>> qafdsflinkm013.scl 
>> 
>> 3 zookeepers:
>> qafdsflinkzk011.scl
>> qafdsflinkzk012.scl
>> qafdsflinkzk013.scl
>> 
>> Thank you,
>> Alex
>> 
>> 
>> 
>> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Does the issue really happen after 48 hours? 
>> Is there some indication of a failure in TaskManager log?
>> 
>> If you will be still unable to solve the problem, please provide full TaskManager and JobManager logs.
>> 
>> Piotrek
>> 
>>> On 21 Mar 2018, at 16:00, Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> One more question - I see a lot of line like the following in the logs
>>> 
>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:35320 <http://flink@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,208] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:41068 <http://flink@qafdsflinkw811.nn.five9lab.com:41068/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,235] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:40677 <http://flink@qafdsflinkw811.nn.five9lab.com:40677/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:40382 <http://flink@qafdsflinkw811.nn.five9lab.com:40382/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:44744 <http://flink@qafdsflinkw811.nn.five9lab.com:44744/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,266] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> 
>>> 
>>> The host is available, but I don't understand where port number comes from. Task Manager uses another port (which is printed in logs on startup)
>>> Could you please help to understand why it happens?
>>> 
>>> Thank you,
>>> Alex
>>> 
>>> 
>>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>>> Hello,
>>> 
>>> I've assembled a standalone cluster of 3 task managers and 3 job managers(and 3 ZK) following the instructions at 
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html> and https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html>
>>> 
>>> It works ok, but randomly, task managers becomes unavailable. JobManager has exception like below in logs:
>>> 
>>> 
>>> [2018-03-19 00:33:10,211] WARN Association with remote system [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>]] Caused by: [Connection refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413 <http://qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413>] (akka.remote.ReliableDeliverySupervisor)
>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:35320 <http://flink@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>> java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
>>>         at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>         at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> 
>>> I can't find a reason for this exception, any ideas?
>>> 
>>> Thank you,
>>> Alex
>> 
>> 
> 
> 


Re: Standalone cluster instability

Posted by Shailesh Jain <sh...@stellapps.com>.
Hi Piotrek,

Thanks for your reply. I checked through the syslogs for that time, and I
see this:

Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill
process 2305 (java) score 468 or sacrifice child
Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305
(java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB

As you pointed out, kernel killed the task manager process.

If I had already set the max heap size for the JVM (to 3GB in this case),
and the memory usage stats showed 2329MB being used 90 seconds earlier, it
seems a bit unlikely for operators to consume 700 MB heap space in that
short time, because our events ingestion rate is not that high (close to 10
events per minute).

2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.
taskmanager.TaskManager              - Memory usage stats: [HEAP:
2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]

Is it possible to log individual operator's memory consumption? This would
help in narrowing down on the root cause. There were around 50 operators
running (~8 kafka source/sink, ~8 Window operators, and the rest CEP
operators).

Thanks,
Shailesh

On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Please post full TaskManager logs, including stderr and stdout. (Have you
> checked the stderr/stdout for some messages?)
>
> I could think of couple reasons:
> 1. process segfault
> 2. process killed by OS
> 3. OS failure
>
> 1. Should be visible by some message in stderr/stdout file and can be
> caused by for example JVM, RocksDB or some other native library/code bug.
> 2. Is your system maybe running out of memory? Kernel might kill process
> if that’s happening. You can also check system (linux?) logs for errors
> that correlate in time. Where are those logs depend on your OS.
> 3. This might be tricky, but I have seen kernel failures that prevented
> any messages from being logged for example. Besides this TaskManager
> failure is your machine operating normally without any other
> problems/crashes/restarts?
>
> Piotrek
>
> On 10 Aug 2018, at 06:59, Shailesh Jain <sh...@stellapps.com>
> wrote:
>
> Hi,
>
> I hit a similar issue yesterday, the task manager died suspiciously, no
> error logs in the task manager logs, but I see the following exceptions in
> the job manager logs:
>
> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting
>                                 - Association to [
> akka.tcp://flink@localhost:34483] with UID [328996232] irrecoverably
> failed. Quarantining address.
> java.util.concurrent.TimeoutException: Remote system has been silent for
> too long. (more than 48.0 hours)
>         at akka.remote.ReliableDeliverySupervisor$$
> anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.remote.ReliableDeliverySupervisor.
> aroundReceive(Endpoint.scala:203)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> but almost 3 days later it hit this:
>
> 2018-08-08 13:22:00,061 INFO  org.apache.flink.runtime.
> executiongraph.ExecutionGraph        - Job Internal state machine job (
> 1057c13d169dae609466210174e2cc8b) switched from state RUNNING to FAILING.
> java.lang.Exception: TaskManager was lost/killed:
> 5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
>         at org.apache.flink.runtime.instance.SimpleSlot.
> releaseSlot(SimpleSlot.java:217)
>         at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.
> releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>         at org.apache.flink.runtime.instance.SharedSlot.
> releaseSlot(SharedSlot.java:192)
>         at org.apache.flink.runtime.instance.Instance.markDead(
> Instance.java:167)
>         at org.apache.flink.runtime.instance.InstanceManager.
> unregisterTaskManager(InstanceManager.java:212)
>         at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$
> apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(
> JobManager.scala:1198)
>         at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>         at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
>         at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>         at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.
> scala:123)
>         at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:122)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.dungeon.DeathWatch$class.receivedTerminated(
> DeathWatch.scala:46)
>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> followed by:
>
> 2018-08-08 13:22:20,090 INFO  org.apache.flink.runtime.
> executiongraph.ExecutionGraph        - Job Internal state machine job (
> 1057c13d169dae609466210174e2cc8b) switched from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. Task to schedule: < Attempt #2 (Source: Custom Source ->
> Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID <
> fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup [
> fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7,
> 8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler:
> Number of instances=0, total number of slots=0, available slots=0
>         at org.apache.flink.runtime.jobmanager.scheduler.
> Scheduler.scheduleTask(Scheduler.java:263)
>         at org.apache.flink.runtime.jobmanager.scheduler.
> Scheduler.allocateSlot(Scheduler.java:142)
>         at org.apache.flink.runtime.executiongraph.Execution.lambda$
> allocateAndAssignSlotForExecution$1(Execution.java:440)
>         at java.util.concurrent.CompletableFuture.uniComposeStage(
> CompletableFuture.java:981)
>         at java.util.concurrent.CompletableFuture.thenCompose(
> CompletableFuture.java:2124)
>         at org.apache.flink.runtime.executiongraph.Execution.
> allocateAndAssignSlotForExecution(Execution.java:438)
>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.
> allocateResourcesForAll(ExecutionJobVertex.java:503)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> scheduleEager(ExecutionGraph.java:900)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> scheduleForExecution(ExecutionGraph.java:854)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> restart(ExecutionGraph.java:1175)
>         at org.apache.flink.runtime.executiongraph.restart.
> ExecutionGraphRestartCallback.triggerFullRecovery(
> ExecutionGraphRestartCallback.java:59)
>         at org.apache.flink.runtime.executiongraph.restart.
> FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
>
> There are no error logs in task manager, and following is the last memory
> consumption log by task manager:
>
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.
> taskmanager.TaskManager              - Memory usage stats: [HEAP:
> 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.
> taskmanager.TaskManager              - Direct memory stats: Count: 115,
> Total Capacity: 38101792, Used Memory: 38101793
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.
> taskmanager.TaskManager              - Off-heap pool stats: [Code Cache:
> 52/55/240 MB (used/committed/max)], [Metaspace: 90/125/-1 MB
> (used/committed/max)], [Compressed Class Space: 11/17/1024 MB
> (used/committed/max)]
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.
> taskmanager.TaskManager              - Garbage collector stats: [G1 Young
> Generation, GC TIME (ms): 300736, GC COUNT: 6574], [G1 Old Generation, GC
> TIME (ms): 152, GC COUNT: 2]
>
> So I think it rules out OOM as a cause for this crash.
>
> Any ideas/leads to debug this would be really helpful. The cluster is
> running on version 1.4.2.
>
> Thanks,
> Shailesh
>
> On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov <
> alexander.smirnoff@gmail.com> wrote:
>
>> Hi Piotr,
>>
>> I didn't find anything special in the logs before the failure.
>> Here are the logs, please take a look:
>>
>> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lU
>> X-bkn_x7QV59?usp=sharing
>>
>> The configuration is:
>>
>> 3 task managers:
>> qafdsflinkw011.scl
>> qafdsflinkw012.scl
>> qafdsflinkw013.scl - lost connection
>>
>> 3 job  managers:
>> qafdsflinkm011.scl - the leader
>> qafdsflinkm012.scl
>> qafdsflinkm013.scl
>>
>> 3 zookeepers:
>> qafdsflinkzk011.scl
>> qafdsflinkzk012.scl
>> qafdsflinkzk013.scl
>>
>> Thank you,
>> Alex
>>
>>
>>
>> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Does the issue really happen after 48 hours?
>>> Is there some indication of a failure in TaskManager log?
>>>
>>> If you will be still unable to solve the problem, please provide full
>>> TaskManager and JobManager logs.
>>>
>>> Piotrek
>>>
>>> On 21 Mar 2018, at 16:00, Alexander Smirnov <
>>> alexander.smirnoff@gmail.com> wrote:
>>>
>>> One more question - I see a lot of line like the following in the logs
>>>
>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://
>>> flink@qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
>>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,208] WARN Association to [akka.tcp://
>>> flink@qafdsflinkw811.nn.five9lab.com:41068] with unknown UID is
>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,235] WARN Association to [akka.tcp://
>>> flink@qafdsflinkw811.nn.five9lab.com:40677] with unknown UID is
>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://
>>> flink@qafdsflinkw811.nn.five9lab.com:40382] with unknown UID is
>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://
>>> flink@qafdsflinkw811.nn.five9lab.com:44744] with unknown UID is
>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,266] WARN Association to [akka.tcp://
>>> flink@qafdsflinkw811.nn.five9lab.com:42413] with unknown UID is
>>> irrecoverably failed. Address cannot be quarantined without knowing the
>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>>
>>>
>>> The host is available, but I don't understand where port number comes
>>> from. Task Manager uses another port (which is printed in logs on startup)
>>> Could you please help to understand why it happens?
>>>
>>> Thank you,
>>> Alex
>>>
>>>
>>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <
>>> alexander.smirnoff@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I've assembled a standalone cluster of 3 task managers and 3 job
>>>> managers(and 3 ZK) following the instructions at
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>> ops/deployment/cluster_setup.html and https://ci.apache.org/projects
>>>> /flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html
>>>>
>>>> It works ok, but randomly, task managers becomes unavailable.
>>>> JobManager has exception like below in logs:
>>>>
>>>>
>>>> [2018-03-19 00:33:10,211] WARN Association with remote system
>>>> [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413] has failed,
>>>> address is now gated for [5000] ms. Reason: [Association failed with
>>>> [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413]] Caused by:
>>>> [Connection refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413]
>>>> (akka.remote.ReliableDeliverySupervisor)
>>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://
>>>> flink@qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
>>>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>>> for too long. (more than 48.0 hours)
>>>>         at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.
>>>> applyOrElse(Endpoint.scala:375)
>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>         at akka.remote.ReliableDeliverySupervisor.aroundReceive(
>>>> Endpoint.scala:203)
>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>>>> java:260)
>>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>> ForkJoinPool.java:1339)
>>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>>> l.java:1979)
>>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>>> orkerThread.java:107)
>>>>
>>>> I can't find a reason for this exception, any ideas?
>>>>
>>>> Thank you,
>>>> Alex
>>>>
>>>
>>>
>
>

Re: Standalone cluster instability

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Please post full TaskManager logs, including stderr and stdout. (Have you checked the stderr/stdout for some messages?)

I could think of couple reasons:
1. process segfault
2. process killed by OS
3. OS failure

1. Should be visible by some message in stderr/stdout file and can be caused by for example JVM, RocksDB or some other native library/code bug. 
2. Is your system maybe running out of memory? Kernel might kill process if that’s happening. You can also check system (linux?) logs for errors that correlate in time. Where are those logs depend on your OS. 
3. This might be tricky, but I have seen kernel failures that prevented any messages from being logged for example. Besides this TaskManager failure is your machine operating normally without any other problems/crashes/restarts?

Piotrek

> On 10 Aug 2018, at 06:59, Shailesh Jain <sh...@stellapps.com> wrote:
> 
> Hi,
> 
> I hit a similar issue yesterday, the task manager died suspiciously, no error logs in the task manager logs, but I see the following exceptions in the job manager logs:
> 
> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting                                          - Association to [akka.tcp://flink@localhost:34483] with UID [328996232] irrecoverably failed. Quarantining address.
> java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
>         at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> but almost 3 days later it hit this:
> 
> 2018-08-08 13:22:00,061 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from state RUNNING to FAILING.
> java.lang.Exception: TaskManager was lost/killed: 5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
>         at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>         at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>         at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>         at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>         at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>         at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> followed by:
> 
> 2018-08-08 13:22:20,090 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #2 (Source: Custom Source -> Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID < fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup [fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7, 8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler: Number of instances=0, total number of slots=0, available slots=0
>         at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:263)
>         at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:142)
>         at org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$1(Execution.java:440)
>         at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
>         at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
>         at org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:438)
>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:503)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:900)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:854)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1175)
>         at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
>         at org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 
> There are no error logs in task manager, and following is the last memory consumption log by task manager:
> 
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Direct memory stats: Count: 115, Total Capacity: 38101792, Used Memory: 38101793
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap pool stats: [Code Cache: 52/55/240 MB (used/committed/max)], [Metaspace: 90/125/-1 MB (used/committed/max)], [Compressed Class Space: 11/17/1024 MB (used/committed/max)]
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 300736, GC COUNT: 6574], [G1 Old Generation, GC TIME (ms): 152, GC COUNT: 2]
> 
> So I think it rules out OOM as a cause for this crash.
> 
> Any ideas/leads to debug this would be really helpful. The cluster is running on version 1.4.2.
> 
> Thanks,
> Shailesh
> 
> On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
> Hi Piotr,
> 
> I didn't find anything special in the logs before the failure. 
> Here are the logs, please take a look:
> 
> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing <https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing>
> 
> The configuration is:
> 
> 3 task managers:
> qafdsflinkw011.scl 
> qafdsflinkw012.scl 
> qafdsflinkw013.scl - lost connection
> 
> 3 job  managers:
> qafdsflinkm011.scl - the leader
> qafdsflinkm012.scl 
> qafdsflinkm013.scl 
> 
> 3 zookeepers:
> qafdsflinkzk011.scl
> qafdsflinkzk012.scl
> qafdsflinkzk013.scl
> 
> Thank you,
> Alex
> 
> 
> 
> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> Does the issue really happen after 48 hours? 
> Is there some indication of a failure in TaskManager log?
> 
> If you will be still unable to solve the problem, please provide full TaskManager and JobManager logs.
> 
> Piotrek
> 
>> On 21 Mar 2018, at 16:00, Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>> 
>> One more question - I see a lot of line like the following in the logs
>> 
>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:35320 <http://flink@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,208] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:41068 <http://flink@qafdsflinkw811.nn.five9lab.com:41068/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,235] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:40677 <http://flink@qafdsflinkw811.nn.five9lab.com:40677/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:40382 <http://flink@qafdsflinkw811.nn.five9lab.com:40382/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:44744 <http://flink@qafdsflinkw811.nn.five9lab.com:44744/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,266] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)
>> 
>> 
>> The host is available, but I don't understand where port number comes from. Task Manager uses another port (which is printed in logs on startup)
>> Could you please help to understand why it happens?
>> 
>> Thank you,
>> Alex
>> 
>> 
>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>> Hello,
>> 
>> I've assembled a standalone cluster of 3 task managers and 3 job managers(and 3 ZK) following the instructions at 
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html> and https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html>
>> 
>> It works ok, but randomly, task managers becomes unavailable. JobManager has exception like below in logs:
>> 
>> 
>> [2018-03-19 00:33:10,211] WARN Association with remote system [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 <http://flink@qafdsflinkw811.nn.five9lab.com:42413/>]] Caused by: [Connection refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413 <http://qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413>] (akka.remote.ReliableDeliverySupervisor)
>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:35320 <http://flink@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>> java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
>>         at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>         at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 
>> I can't find a reason for this exception, any ideas?
>> 
>> Thank you,
>> Alex
> 
>