You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Subramanya Suresh <ss...@salesforce.com> on 2018/08/30 06:48:14 UTC

Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Hi, we are seeing a weird issue where one TaskManager is lost and then
never re-allocated and subsequently operators fail with
NoResourceAvailableException and after 5 restarts (we have FixedDelay
restarts of 5) the application goes down.

   - We have explicitly set *yarn.reallocate-failed: *true and have not
   specified the yarn.maximum-failed-containers (and see
   “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN
   application tolerates 145 failed TaskManager containers before giving up”
   in the logs).
   - After the initial startup where all 145 TaskManagers are requested I
   never see any logs saying “Requesting new TaskManager container” to
   reallocate failed container.


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
                - Detected unreachable: [akka.tcp://
flink@blahabc.sfdc.net:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
                - Task manager akka.tcp://
flink@blahabc.sfdc.net:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed:
container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed:
container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414
(dataPort=124)

at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager
            - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number
of registered task managers 144. Number of available slots 720.

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
                - Received message for non-existing checkpoint 1

2018-08-29 23:14:39,969 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Recovering checkpoints from ZooKeeper.

2018-08-29 23:14:39,975 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Found 0 checkpoints in ZooKeeper.

2018-08-29 23:14:39,975 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Trying to fetch 0 checkpoints from storage.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. Task to schedule: < Attempt #1


*After 5 retries of our Sql query execution graph (we have configured 5
fixed delay restart), it outputs the below, *


2018-08-29 23:15:22,216 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    - Stopping checkpoint coordinator for job
ab7e96a1659fbb4bfb3c6cd9bccb0335

2018-08-29 23:15:22,216 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Shutting down

2018-08-29 23:15:22,225 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Removing
/prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335
from ZooKeeper

2018-08-29 23:15:23,460 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
Shutting down.

2018-08-29 23:15:23,460 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:24,114 INFO
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
                - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in
terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
                - Stopping JobManager with final application status FAILED
and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
              - Shutting down cluster with status FAILED : The monitored
job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
              - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist
              - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived at
hdfs:/savedSearches/prod/completed-jobs/ab7e96a1659fbb4bfb3c6cd9bccb0335.


Cheers,

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Posted by Subramanya Suresh <ss...@salesforce.com>.
Hi Till,
Thanks for the information. Please let me know what additional details you
would know.
Lets follow this up on the slack channel (Has Jeff and Konstantin),
c-salesforce-poc.

Sincerely,

On Tue, Sep 18, 2018 at 1:14 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi,
>
> the quarantining is not really solvable 1.4.x without restarting the Flink
> component. Thus, I would recommend upgrading to the latest Flink (1.6.0,
> 1.6.1 will be released later this week) version.
>
> In order to tell what would be the shorter route I would need to know a
> bit more details about the problems you are facing.
>
> Cheers,
> Till
>
> On Mon, Sep 17, 2018 at 9:24 PM Subramanya Suresh <ss...@salesforce.com>
> wrote:
>
>> Hi Till,
>> Update on this. We are still weeding past 1.6.0 setup and run. In a
>> separate thread, we are running into issues, and sense more on the horizon
>> before we get it working.
>> We are under some tight timelines, so want to ask how confident you are
>> that the above would be fixed in 1.6.0 ? and that trying to fix it in 1.4.2
>> is the longer route.
>>
>> Sincerely,
>>
>> On Wed, Sep 12, 2018 at 6:49 PM, Subramanya Suresh <
>> ssuresh@salesforce.com> wrote:
>>
>>> Hi Till,
>>> *After taskmanager.exit-on-fatal-akka-error: true` *
>>> I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]
>>> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not
>>> sure if that is a coincidence or a direct impact of this change.
>>>
>>> Quick update, I can confirm our issue still happens even with the flag
>>> being true. With a proper restart strategy (rate based, that gives it
>>> enough time) it can recover from container failures like the first case
>>> below, but not able to recover from "detected unreachable" issues like the
>>> second case below.
>>>
>>> We are currently using the below configuration. So I guess the only
>>> options left are to increase the heartbeat.pause or move to 1.6 as you
>>> suggested
>>>
>>>
>>> akka.client.timeout: 600s
>>> akka.ask.timeout: 600s
>>> akka.lookup.timeout: 600s
>>> akka.watch.heartbeat.pause: 120s
>>>
>>>
>>> ___________________________________________________
>>> 8-09-11 16:30:44,861 INFO org.apache.flink.yarn.YarnFlinkResourceManager
>>> - Container container_e29_1536261974019_1134_01_000036 failed. Exit
>>> status: -100
>>> 2018-09-11 16 <2018091116>:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager
>>> - Diagnostics for container container_e29_1536261974019_1134_01_000036
>>> in state COMPLETE : exitStatus=-100 diagnostics=Container released on a
>>> *lost* node
>>> 2018-09-11 16 <2018091116>:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager
>>> - Total number of failed containers so far: 1
>>> 2018-09-11 16 <2018091116>:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager
>>> - Requesting new TaskManager container with 20000 megabytes memory. Pending
>>> requests: 1
>>> 2018-09-11 16 <2018091116>:30:44,862 INFO org.apache.flink.yarn.YarnJobManager
>>> - Task manager akka.tcp://flink@hellow-world2-13:41157/user/taskmanager
>>> terminated.
>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>>> applyOrElse(YarnJobManager.scala:110)
>>> 2018-09-11 16 <2018091116>:30:44,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
>>> - Job streaming-searches-prod (1af29a616cba4bd76f920f7c80189535)
>>> switched from state RUNNING to FAILING.
>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>>> applyOrElse(YarnJobManager.scala:110)
>>> 2018-09-11 16 <2018091116>:30:49,700 WARN akka.remote.ReliableDeliverySupervisor
>>> - Association with remote system [akka.tcp://flink@hello-world2-13:41157]
>>> has failed, address is now gated for [5000] ms. Reason: [Disassociated].
>>> This container 2-13, just has a received
>>> 2018-09-11 16 <2018091116>:30:47,195 INFO org.apache.flink.yarn.YarnTaskManagerRunnerFactory
>>> - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>>> ___________________________________________________
>>> But then failed with the same old
>>> 018-09-11 16:42:58,395 WARN akka.remote.RemoteWatcher - Detected
>>> unreachable: [akka.tcp://flink@hello-world3-3:44607]
>>> 2018-09-11 16 <2018091116>:42:58,409 INFO org.apache.flink.yarn.YarnJobManager
>>> - Task manager akka.tcp://flink@hello-world3-3/user/taskmanager
>>> terminated.
>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>>> applyOrElse(YarnJobManager.scala:110)
>>> ___________________________________________________
>>>
>>> On Thu, Sep 6, 2018 at 9:08 AM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Subramanya,
>>>>
>>>> if the container is still running and the TM can simply not connect to
>>>> the JobManager, then the ResourceManager does not see a problem. The RM
>>>> things in terms of containers and as long as n containers are running, it
>>>> won't start new ones. That's the reason why the TM should exit in order to
>>>> terminate the container.
>>>>
>>>> Have you tried using a newer Flink version? Lately we have reworked a
>>>> good part of Flink's distributed architecture and added resource elasticity
>>>> (starting with Flink 1.5).
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <
>>>> ssuresh@salesforce.com> wrote:
>>>>
>>>>> Hi,
>>>>> With some additional research,
>>>>>
>>>>> *Before the flag*
>>>>> I realized for failed containers (that exited for a specific  we still
>>>>> were Requesting new TM container and launching TM). But for the "Detected
>>>>> unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]" issue I do not
>>>>> see the container marked as failed and a subsequent request for TM.
>>>>>
>>>>> *After taskmanager.exit-on-fatal-akka-error: true` *
>>>>> I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]
>>>>> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am
>>>>> not sure if that is a coincidence or a direct impact of this change.
>>>>>
>>>>> *Our Issue:*
>>>>> I realized we are still exiting the application, i.e. failing when the
>>>>> containers are lost. The reason for this is before org.apache.flink.yarn.YarnFlinkResouceManager
>>>>> is able to acquire a new container TM, launch TM and it is reported as
>>>>> started, the org.apache.flink.runtime.jobmanager.scheduler throws a
>>>>> NoResourceAvailableException that causes a failure. In our case we had
>>>>> fixed restart strategy with 5, and we are running out of it because of
>>>>> this. I am looking to solve this with a FailureRateRestartStrategy over 2
>>>>> minutes interval (10 second restart delay, >12 failures), that lets the TM
>>>>> come back (takes about 50 seconds).
>>>>>
>>>>> *Flink Bug*
>>>>> But I cannot help but think why there is no interaction between the
>>>>> ResourceManager and JobManager, i.e. why is the jobmanager continuing with
>>>>> the processing despite not having the required TMs ?
>>>>>
>>>>> Logs to substantiate what I said above (only relevant).
>>>>>
>>>>> 018-09-03 06:17:13,932 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Container container_e28_1535135887442_1381_01_000097
>>>>> completed successfully with diagnostics: Container released by application
>>>>>
>>>>>
>>>>> 2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor
>>>>>                       - Association with remote system [akka.tcp://
>>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>>> now gated for [5000] ms. Reason: [Disassociated]
>>>>> 2018-09-03 06:34:19,214 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Container container_e27_1535135887442_1381_01_000102
>>>>> failed. Exit status: -102
>>>>> 2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Diagnostics for container container_e27_1535135887442_1381_01_000102
>>>>> in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by
>>>>> scheduler
>>>>> 2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Total number of failed containers so far: 1
>>>>> 2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Requesting new TaskManager container with 20000 megabytes
>>>>> memory. Pending requests: 1
>>>>> 2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Task manager akka.tcp://flink@hello-world9-
>>>>> 27-crz.ops.sfdc.net:46219/user/taskmanager terminated.
>>>>> 2018-09-03 06:34:19,218 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>>>> switched from state RUNNING to FAILING.
>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>> container_e27_1535135887442_1381_01_000102 @
>>>>> hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
>>>>> 2018-09-03 06:34:19,466 INFO  org.apache.flink.runtime.instance.InstanceManager
>>>>>            - Unregistered task manager hello-world9-27-crz.ops.sfdc.
>>>>> net/11.11.35.220. Number of registered task managers 144. Number of
>>>>> available slots 720
>>>>>
>>>>> 2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Received new container: container_e28_1535135887442_1381_01_000147
>>>>> - Remaining pending container requests: 0
>>>>> 2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Launching TaskManager in container ContainerInLaunch @
>>>>> 1535956464717: Container: [ContainerId: container_e28_1535135887442_1381_01_000147,
>>>>> NodeId: hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress:
>>>>> hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480,
>>>>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
>>>>> 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
>>>>> 2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor
>>>>>                       - Association with remote system [akka.tcp://
>>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>>>> 2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport
>>>>>                   - Remote connection to [null] failed with
>>>>> java.net.ConnectException: Connection refused:
>>>>> hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
>>>>> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>>>>>                       - Association with remote system [akka.tcp://
>>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>>>> 2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>>>> switched from state RESTARTING to CREATED.
>>>>>
>>>>> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>>>>>                       - Association with remote system [akka.tcp://
>>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>>>> 2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>>>> switched from state RESTARTING to CREATED.
>>>>> 2018-09-03 06:34:35,044 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>>>> switched from state CREATED to RUNNING.
>>>>> 2018-09-03 06:34:35,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>>>> switched from state RUNNING to FAILING.
>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>> Not enough free slots available to run the job. You can decrease the
>>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>>> configuration. Task to schedule: < Attempt #3 (Source: Custom Source ->
>>>>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
>>>>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
>>>>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'),
>>>>> =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch),
>>>>> _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'),
>>>>> cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS
>>>>> NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 2018-09-03 06:34:39,248 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - TaskManager container_e28_1535135887442_1381_01_000147
>>>>> has started.
>>>>> 2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Container container_e28_1535135887442_1381_01_000147
>>>>> failed. Exit status: -102
>>>>> 2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Diagnostics for container container_e28_1535135887442_1381_01_000147
>>>>> in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by
>>>>> scheduler
>>>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Task manager akka.tcp://flink@hello-world9-
>>>>> 27-crz.ops.sfdc.net:41966/user/taskmanager terminated.
>>>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Total number of failed containers so far: 2
>>>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.runtime.instance.InstanceManager
>>>>>            - Unregistered task manager hello-world9-27-crz.ops.sfdc.
>>>>> net/11.11.35.220. Number of registered task managers 144. Number of
>>>>> available slots 720.
>>>>>
>>>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Received new container: container_e28_1535135887442_1381_01_000202
>>>>> - Remaining pending container requests: 0
>>>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Launching TaskManager in container ContainerInLaunch @
>>>>> 1535956485236: Container: [ContainerId: container_e28_1535135887442_1381_01_000202,
>>>>> NodeId: hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress:
>>>>> hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480,
>>>>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
>>>>> 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
>>>>> 2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Received new container: container_e28_1535135887442_1381_01_000203
>>>>> - Remaining pending container requests: 0
>>>>> 2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Returning excess container container_e28_1535135887442_
>>>>> 1381_01_000203
>>>>>
>>>>> Notice there is no TaskManager container_e28_1535135887442_1381_01_000202
>>>>> has started.
>>>>> I see
>>>>> 2018-09-03 06:34:56,894 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>>>> switched from state FAILING to FAILED.
>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>> Not enough free slots available to run the job. You can decrease the
>>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>>> configuration. Task to schedule: < Attempt #5 (Source: Custom Source ->
>>>>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
>>>>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
>>>>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R
>>>>>
>>>>>
>>>>> 2018-09-03 06:34:57,005 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Shutting down cluster with status FAILED : The monitored
>>>>> job with ID 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
>>>>> 2018-09-03 06:34:57,007 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>               - Unregistering application from the YARN Resource Manager
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <
>>>>> ssuresh@salesforce.com> wrote:
>>>>>
>>>>>> Thanks TIll,
>>>>>> I do not see any Akka related messages in that Taskmanager after the
>>>>>> initial startup. It seemed like all is well. So after the remotewatcher
>>>>>> detects it unreachable and the TaskManager unregisters it, I do not see any
>>>>>> other activity in the JobManager with regards to reallocation etc.
>>>>>> - Does the quarantining of the TaskManager not happen until  the
>>>>>> exit-on-fatal-akka-error is turned on ?
>>>>>> - Does the JobManager or the TaskManager try to reconnect to each
>>>>>> other again ? Is there a different setting for it ?
>>>>>> - Does the JobManager not reallocate a TaskManager despite it being
>>>>>> unregistered, until the TaskManager exits ? I think it should, especially
>>>>>> if it is not trying to establish a connection again.
>>>>>>
>>>>>> I will give the flag a try.
>>>>>>
>>>>>> Sincerely,
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Could you check whether akka.tcp://flink@blahabc.sfdc.net:123/user/
>>>>>>> taskmanager is still running? E.g. tries to reconnect to the
>>>>>>> JobManager? If this is the case, then the container is still running and
>>>>>>> the YarnFlinkResourceManager thinks that everything is alright. You can
>>>>>>> activate that a TaskManager kills itself if it gets quarantined by setting
>>>>>>> `taskmanager.exit-on-fatal-akka-error: true` in the
>>>>>>> `flink-conf.yaml`.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <
>>>>>>> ssuresh@salesforce.com> wrote:
>>>>>>>
>>>>>>>> Hi Till,
>>>>>>>> Greatly appreciate your reply.
>>>>>>>> We use version 1.4.2. I do not see nothing unusual in the logs for
>>>>>>>> TM that was lost. Note: I have looked at many such failures and see the
>>>>>>>> same below pattern.
>>>>>>>>
>>>>>>>> The JM logs above had most of what I had, but the below is what I
>>>>>>>> have when I search for flink.yarn (we have huge logs otherwise, given the
>>>>>>>> amount of SQL queries we run). The gist is Akka detecs unreachable, TM
>>>>>>>> marked lost and unregistered by JM, operators start failing with
>>>>>>>> NoResourceAvailableException since there was one less TM, 5 retry attempts
>>>>>>>> later job goes down.
>>>>>>>>
>>>>>>>> ………….
>>>>>>>>
>>>>>>>> 2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.
>>>>>>>> YarnFlinkResourceManager                - TaskManager
>>>>>>>> container_e27_1535135887442_0906_01_000124 has started.
>>>>>>>>
>>>>>>>> 2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.
>>>>>>>> YarnFlinkResourceManager                - TaskManager
>>>>>>>> container_e27_1535135887442_0906_01_000159 has started.
>>>>>>>>
>>>>>>>> 2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Submitting job
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).
>>>>>>>>
>>>>>>>> 2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Using restart strategy
>>>>>>>> FixedDelayRestartStrategy(maxNumberRestartAttempts=5,
>>>>>>>> delayBetweenRestartAttempts=10000) for
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>>>>>
>>>>>>>> 2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Running initialization on master for job
>>>>>>>> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).
>>>>>>>>
>>>>>>>> 2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Successfully ran initialization on
>>>>>>>> master in 0 ms.
>>>>>>>>
>>>>>>>> 2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Using application-defined state backend
>>>>>>>> for checkpoint/savepoint metadata: File State Backend @
>>>>>>>> hdfs://security-temp/savedSearches/checkpoint.
>>>>>>>>
>>>>>>>> 2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Scheduling job
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).
>>>>>>>>
>>>>>>>> 2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Attempting to recover all jobs.
>>>>>>>>
>>>>>>>> 2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - There are 1 jobs to recover. Starting
>>>>>>>> the job recovery.
>>>>>>>>
>>>>>>>> 2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Attempting to recover job
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>>>>>
>>>>>>>> 2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Ignoring job recovery for
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.
>>>>>>>>
>>>>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Task manager akka.tcp://flink@
>>>>>>>> blahabc.sfdc.net:123/user/taskmanager terminated.
>>>>>>>>
>>>>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$
>>>>>>>> handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>>
>>>>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$
>>>>>>>> handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>>
>>>>>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Received message for non-existing
>>>>>>>> checkpoint 1
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Job with ID
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED.
>>>>>>>> Shutting down session
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Stopping JobManager with final
>>>>>>>> application status FAILED and diagnostics: The monitored job with ID
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.
>>>>>>>> YarnFlinkResourceManager                - Shutting down cluster
>>>>>>>> with status FAILED : The monitored job with ID
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.
>>>>>>>> YarnFlinkResourceManager                - Unregistering
>>>>>>>> application from the YARN Resource Manager
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Deleting yarn application files under
>>>>>>>> hdfs://security-temp/user/sec-data-app/.flink/application_
>>>>>>>> 1535135887442_0906.
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Stopping JobManager akka.tcp://flink@
>>>>>>>> blahxyz.sfdc.net <http://blahabc.sfdc.net/>:1235/user/jobmanager.
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Actor system shut down timed out.
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Shutdown completed. Stopping JVM.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <
>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Subramanya,
>>>>>>>>>
>>>>>>>>> in order to help you I need a little bit of context. Which version
>>>>>>>>> of Flink are you running? The configuration yarn.reallocate-failed is
>>>>>>>>> deprecated since version Flink 1.1 and does not have an effect anymore.
>>>>>>>>>
>>>>>>>>> What would be helpful is to get the full jobmanager log from you.
>>>>>>>>> If the YarnFlinkResourceManager gets notified that a container has failed,
>>>>>>>>> it should restart this container (it will do this 145 times). So if the
>>>>>>>>> YarnFlinkResourceManager does not get notified about a completed container,
>>>>>>>>> then this might indicate that the container is still running. So it would
>>>>>>>>> be good to check what the logs of container_e27_1535135887442_0906_01_000039
>>>>>>>>> say.
>>>>>>>>>
>>>>>>>>> Moreover, do you see the same problem occur when using the latest
>>>>>>>>> release Flink 1.6.0?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <
>>>>>>>>> ssuresh@salesforce.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi, we are seeing a weird issue where one TaskManager is lost and
>>>>>>>>>> then never re-allocated and subsequently operators fail with
>>>>>>>>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay
>>>>>>>>>> restarts of 5) the application goes down.
>>>>>>>>>>
>>>>>>>>>>    - We have explicitly set *yarn.reallocate-failed: *true and
>>>>>>>>>>    have not specified the yarn.maximum-failed-containers (and see
>>>>>>>>>>    “org.apache.flink.yarn.YarnApplicationMasterRunner
>>>>>>>>>>      - YARN application tolerates 145 failed TaskManager
>>>>>>>>>>    containers before giving up” in the logs).
>>>>>>>>>>    - After the initial startup where all 145 TaskManagers are
>>>>>>>>>>    requested I never see any logs saying “Requesting new TaskManager
>>>>>>>>>>    container” to reallocate failed container.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>>>>>>>>>>                             - Detected unreachable: [akka.tcp://
>>>>>>>>>> flink@blahabc.sfdc.net:123]
>>>>>>>>>>
>>>>>>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.
>>>>>>>>>> YarnJobManager                          - Task manager
>>>>>>>>>> akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager
>>>>>>>>>> terminated.
>>>>>>>>>>
>>>>>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
>>>>>>>>>> (dataPort=124)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.instance.SimpleSlot.
>>>>>>>>>> releaseSlot(SimpleSlot.java:217)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.
>>>>>>>>>> releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.instance.SharedSlot.
>>>>>>>>>> releaseSlot(SharedSlot.java:192)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.instance.Instance.markDead(
>>>>>>>>>> Instance.java:167)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.instance.InstanceManager.
>>>>>>>>>> unregisterTaskManager(InstanceManager.java:212)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org$
>>>>>>>>>> apache$flink$runtime$jobmanager$JobManager$$
>>>>>>>>>> handleTaskManagerTerminated(JobManager.scala:1198)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>>>>>>>> anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>>>>>
>>>>>>>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>>>>>>>> AbstractPartialFunction.scala:36)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.clusterframework.
>>>>>>>>>> ContaineredJobManager$$anonfun$handleContainerMessage$1.
>>>>>>>>>> applyOrElse(ContaineredJobManager.scala:107)
>>>>>>>>>>
>>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$
>>>>>>>>>> handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>>>>
>>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
>>>>>>>>>> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.
>>>>>>>>>> scala:49)
>>>>>>>>>>
>>>>>>>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>>>>>>>> AbstractPartialFunction.scala:36)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>>>>>>>> LogMessages.scala:33)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>>>>>>>> LogMessages.scala:28)
>>>>>>>>>>
>>>>>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.
>>>>>>>>>> scala:123)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.
>>>>>>>>>> applyOrElse(LogMessages.scala:28)
>>>>>>>>>>
>>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.
>>>>>>>>>> aroundReceive(JobManager.scala:122)
>>>>>>>>>>
>>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>>>>>
>>>>>>>>>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(
>>>>>>>>>> DeathWatch.scala:46)
>>>>>>>>>>
>>>>>>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>>>>>>
>>>>>>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>>>>>>
>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>>>>>>
>>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>>>>>
>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>>>>>
>>>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>>>>>
>>>>>>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>>>>>>>>>> ForkJoinTask.java:260)
>>>>>>>>>>
>>>>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>>>>>>> runTask(ForkJoinPool.java:1339)
>>>>>>>>>>
>>>>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>>>>>>>> ForkJoinPool.java:1979)
>>>>>>>>>>
>>>>>>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>>>>
>>>>>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>>>>>> container_e27_1535135887442_0906_01_000039 @
>>>>>>>>>> blahabc.sfdc.net:42414 (dataPort=124)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.instance.SimpleSlot.
>>>>>>>>>> releaseSlot(SimpleSlot.java:217)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.
>>>>>>>>>> releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.instance.SharedSlot.
>>>>>>>>>> releaseSlot(SharedSlot.java:192)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.instance.Instance.markDead(
>>>>>>>>>> Instance.java:167)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.instance.InstanceManager.
>>>>>>>>>> unregisterTaskManager(InstanceManager.java:212)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org$
>>>>>>>>>> apache$flink$runtime$jobmanager$JobManager$$
>>>>>>>>>> handleTaskManagerTerminated(JobManager.scala:1198)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>>>>>>>> anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>>>>>
>>>>>>>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>>>>>>>> AbstractPartialFunction.scala:36)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.clusterframework.
>>>>>>>>>> ContaineredJobManager$$anonfun$handleContainerMessage$1.
>>>>>>>>>> applyOrElse(ContaineredJobManager.scala:107)
>>>>>>>>>>
>>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$
>>>>>>>>>> handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>>>>
>>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
>>>>>>>>>> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.
>>>>>>>>>> scala:49)
>>>>>>>>>>
>>>>>>>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>>>>>>>> AbstractPartialFunction.scala:36)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>>>>>>>> LogMessages.scala:33)
>>>>>>>>>>
>>>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>>>>>>>> LogMessages.scala:28)
>>>>>>>>>>
>>>>>>>>>>


-- 

<http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Posted by Till Rohrmann <tr...@apache.org>.
Hi,

the quarantining is not really solvable 1.4.x without restarting the Flink
component. Thus, I would recommend upgrading to the latest Flink (1.6.0,
1.6.1 will be released later this week) version.

In order to tell what would be the shorter route I would need to know a bit
more details about the problems you are facing.

Cheers,
Till

On Mon, Sep 17, 2018 at 9:24 PM Subramanya Suresh <ss...@salesforce.com>
wrote:

> Hi Till,
> Update on this. We are still weeding past 1.6.0 setup and run. In a
> separate thread, we are running into issues, and sense more on the horizon
> before we get it working.
> We are under some tight timelines, so want to ask how confident you are
> that the above would be fixed in 1.6.0 ? and that trying to fix it in 1.4.2
> is the longer route.
>
> Sincerely,
>
> On Wed, Sep 12, 2018 at 6:49 PM, Subramanya Suresh <ssuresh@salesforce.com
> > wrote:
>
>> Hi Till,
>> *After taskmanager.exit-on-fatal-akka-error: true` *
>> I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]
>> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not
>> sure if that is a coincidence or a direct impact of this change.
>>
>> Quick update, I can confirm our issue still happens even with the flag
>> being true. With a proper restart strategy (rate based, that gives it
>> enough time) it can recover from container failures like the first case
>> below, but not able to recover from "detected unreachable" issues like the
>> second case below.
>>
>> We are currently using the below configuration. So I guess the only
>> options left are to increase the heartbeat.pause or move to 1.6 as you
>> suggested
>>
>>
>> akka.client.timeout: 600s
>> akka.ask.timeout: 600s
>> akka.lookup.timeout: 600s
>> akka.watch.heartbeat.pause: 120s
>>
>>
>> ___________________________________________________
>> 8-09-11 16:30:44,861 INFO org.apache.flink.yarn.YarnFlinkResourceManager
>> - Container container_e29_1536261974019_1134_01_000036 failed. Exit status:
>> -100
>> 2018-09-11 16 <2018091116>:30:44,862 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container
>> container_e29_1536261974019_1134_01_000036 in state COMPLETE :
>> exitStatus=-100 diagnostics=Container released on a *lost* node
>> 2018-09-11 16 <2018091116>:30:44,862 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed
>> containers so far: 1
>> 2018-09-11 16 <2018091116>:30:44,862 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager
>> container with 20000 megabytes memory. Pending requests: 1
>> 2018-09-11 16 <2018091116>:30:44,862 INFO
>> org.apache.flink.yarn.YarnJobManager - Task manager
>> akka.tcp://flink@hellow-world2-13:41157/user/taskmanager terminated.
>> at
>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>> 2018-09-11 16 <2018091116>:30:44,868 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
>> streaming-searches-prod (1af29a616cba4bd76f920f7c80189535) switched from
>> state RUNNING to FAILING.
>> at
>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>> 2018-09-11 16 <2018091116>:30:49,700 WARN
>> akka.remote.ReliableDeliverySupervisor - Association with remote system
>> [akka.tcp://flink@hello-world2-13:41157] has failed, address is now
>> gated for [5000] ms. Reason: [Disassociated].
>> This container 2-13, just has a received
>> 2018-09-11 16 <2018091116>:30:47,195 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunnerFactory - RECEIVED SIGNAL 15:
>> SIGTERM. Shutting down as requested.
>> ___________________________________________________
>> But then failed with the same old
>> 018-09-11 16:42:58,395 WARN akka.remote.RemoteWatcher - Detected
>> unreachable: [akka.tcp://flink@hello-world3-3:44607]
>> 2018-09-11 16 <2018091116>:42:58,409 INFO
>> org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@
>> hello-world3-3/user/taskmanager terminated.
>> at
>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>> ___________________________________________________
>>
>> On Thu, Sep 6, 2018 at 9:08 AM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Subramanya,
>>>
>>> if the container is still running and the TM can simply not connect to
>>> the JobManager, then the ResourceManager does not see a problem. The RM
>>> things in terms of containers and as long as n containers are running, it
>>> won't start new ones. That's the reason why the TM should exit in order to
>>> terminate the container.
>>>
>>> Have you tried using a newer Flink version? Lately we have reworked a
>>> good part of Flink's distributed architecture and added resource elasticity
>>> (starting with Flink 1.5).
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <ss...@salesforce.com>
>>> wrote:
>>>
>>>> Hi,
>>>> With some additional research,
>>>>
>>>> *Before the flag*
>>>> I realized for failed containers (that exited for a specific  we still
>>>> were Requesting new TM container and launching TM). But for the "Detected
>>>> unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]" issue I do not
>>>> see the container marked as failed and a subsequent request for TM.
>>>>
>>>> *After taskmanager.exit-on-fatal-akka-error: true` *
>>>> I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]
>>>> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am
>>>> not sure if that is a coincidence or a direct impact of this change.
>>>>
>>>> *Our Issue:*
>>>> I realized we are still exiting the application, i.e. failing when the
>>>> containers are lost. The reason for this is before
>>>> org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new
>>>> container TM, launch TM and it is reported as started, the org.apache.flink.runtime.jobmanager.scheduler
>>>> throws a NoResourceAvailableException that causes a failure. In our case we
>>>> had fixed restart strategy with 5, and we are running out of it because of
>>>> this. I am looking to solve this with a FailureRateRestartStrategy over 2
>>>> minutes interval (10 second restart delay, >12 failures), that lets the TM
>>>> come back (takes about 50 seconds).
>>>>
>>>> *Flink Bug*
>>>> But I cannot help but think why there is no interaction between the
>>>> ResourceManager and JobManager, i.e. why is the jobmanager continuing with
>>>> the processing despite not having the required TMs ?
>>>>
>>>> Logs to substantiate what I said above (only relevant).
>>>>
>>>> 018-09-03 06:17:13,932 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Container
>>>> container_e28_1535135887442_1381_01_000097 completed successfully with
>>>> diagnostics: Container released by application
>>>>
>>>>
>>>> 2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor
>>>>                     - Association with remote system [akka.tcp://
>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>> now gated for [5000] ms. Reason: [Disassociated]
>>>> 2018-09-03 06:34:19,214 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Container
>>>> container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
>>>> 2018-09-03 06:34:19,215 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics
>>>> for container container_e27_1535135887442_1381_01_000102 in state COMPLETE
>>>> : exitStatus=-102 diagnostics=Container preempted by scheduler
>>>> 2018-09-03 06:34:19,215 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Total
>>>> number of failed containers so far: 1
>>>> 2018-09-03 06:34:19,215 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting
>>>> new TaskManager container with 20000 megabytes memory. Pending requests: 1
>>>> 2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                     - Task manager akka.tcp://
>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219/user/taskmanager
>>>> terminated.
>>>> 2018-09-03 06:34:19,218 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state RUNNING to FAILING.
>>>> java.lang.Exception: TaskManager was lost/killed:
>>>> container_e27_1535135887442_1381_01_000102 @
>>>> hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
>>>> 2018-09-03 06:34:19,466 INFO
>>>> org.apache.flink.runtime.instance.InstanceManager             -
>>>> Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220.
>>>> Number of registered task managers 144. Number of available slots 720
>>>>
>>>> 2018-09-03 06:34:24,717 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>>>> new container: container_e28_1535135887442_1381_01_000147 - Remaining
>>>> pending container requests: 0
>>>> 2018-09-03 06:34:24,717 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>>>> TaskManager in container ContainerInLaunch @ 1535956464717: Container:
>>>> [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId:
>>>> hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress:
>>>> hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480,
>>>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
>>>> 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
>>>> 2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor
>>>>                     - Association with remote system [akka.tcp://
>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>>> 2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport
>>>>                   - Remote connection to [null] failed with
>>>> java.net.ConnectException: Connection refused:
>>>> hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
>>>> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>>>>                     - Association with remote system [akka.tcp://
>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>>> 2018-09-03 06:34:34,540 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state RESTARTING to CREATED.
>>>>
>>>> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>>>>                     - Association with remote system [akka.tcp://
>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>>> 2018-09-03 06:34:34,540 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state RESTARTING to CREATED.
>>>> 2018-09-03 06:34:35,044 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state CREATED to RUNNING.
>>>> 2018-09-03 06:34:35,195 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state RUNNING to FAILING.
>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>> Not enough free slots available to run the job. You can decrease the
>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>> configuration. Task to schedule: < Attempt #3 (Source: Custom Source ->
>>>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
>>>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
>>>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'),
>>>> =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch),
>>>> _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'),
>>>> cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS
>>>> NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF
>>>>
>>>>
>>>>
>>>>
>>>> 2018-09-03 06:34:39,248 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager
>>>> container_e28_1535135887442_1381_01_000147 has started.
>>>> 2018-09-03 06:34:45,235 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Container
>>>> container_e28_1535135887442_1381_01_000147 failed. Exit status: -102
>>>> 2018-09-03 06:34:45,235 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics
>>>> for container container_e28_1535135887442_1381_01_000147 in state COMPLETE
>>>> : exitStatus=-102 diagnostics=Container preempted by scheduler
>>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                     - Task manager akka.tcp://
>>>> flink@hello-world9-27-crz.ops.sfdc.net:41966/user/taskmanager
>>>> terminated.
>>>> 2018-09-03 06:34:45,236 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Total
>>>> number of failed containers so far: 2
>>>> 2018-09-03 06:34:45,236 INFO
>>>> org.apache.flink.runtime.instance.InstanceManager             -
>>>> Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220.
>>>> Number of registered task managers 144. Number of available slots 720.
>>>>
>>>> 2018-09-03 06:34:45,236 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>>>> new container: container_e28_1535135887442_1381_01_000202 - Remaining
>>>> pending container requests: 0
>>>> 2018-09-03 06:34:45,236 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>>>> TaskManager in container ContainerInLaunch @ 1535956485236: Container:
>>>> [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId:
>>>> hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress:
>>>> hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480,
>>>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
>>>> 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
>>>> 2018-09-03 06:34:45,241 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>>>> new container: container_e28_1535135887442_1381_01_000203 - Remaining
>>>> pending container requests: 0
>>>> 2018-09-03 06:34:45,241 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Returning
>>>> excess container container_e28_1535135887442_1381_01_000203
>>>>
>>>> Notice there is no TaskManager
>>>> container_e28_1535135887442_1381_01_000202 has started.
>>>> I see
>>>> 2018-09-03 06:34:56,894 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state FAILING to FAILED.
>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>> Not enough free slots available to run the job. You can decrease the
>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>> configuration. Task to schedule: < Attempt #5 (Source: Custom Source ->
>>>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
>>>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
>>>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R
>>>>
>>>>
>>>> 2018-09-03 06:34:57,005 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting
>>>> down cluster with status FAILED : The monitored job with ID
>>>> 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
>>>> 2018-09-03 06:34:57,007 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>> Unregistering application from the YARN Resource Manager
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <
>>>> ssuresh@salesforce.com> wrote:
>>>>
>>>>> Thanks TIll,
>>>>> I do not see any Akka related messages in that Taskmanager after the
>>>>> initial startup. It seemed like all is well. So after the remotewatcher
>>>>> detects it unreachable and the TaskManager unregisters it, I do not see any
>>>>> other activity in the JobManager with regards to reallocation etc.
>>>>> - Does the quarantining of the TaskManager not happen until  the
>>>>> exit-on-fatal-akka-error is turned on ?
>>>>> - Does the JobManager or the TaskManager try to reconnect to each
>>>>> other again ? Is there a different setting for it ?
>>>>> - Does the JobManager not reallocate a TaskManager despite it being
>>>>> unregistered, until the TaskManager exits ? I think it should, especially
>>>>> if it is not trying to establish a connection again.
>>>>>
>>>>> I will give the flag a try.
>>>>>
>>>>> Sincerely,
>>>>>
>>>>>
>>>>> On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Could you check whether akka.tcp://
>>>>>> flink@blahabc.sfdc.net:123/user/taskmanager is still running? E.g.
>>>>>> tries to reconnect to the JobManager? If this is the case, then the
>>>>>> container is still running and the YarnFlinkResourceManager thinks that
>>>>>> everything is alright. You can activate that a TaskManager kills itself if
>>>>>> it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true`
>>>>>> in the `flink-conf.yaml`.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <
>>>>>> ssuresh@salesforce.com> wrote:
>>>>>>
>>>>>>> Hi Till,
>>>>>>> Greatly appreciate your reply.
>>>>>>> We use version 1.4.2. I do not see nothing unusual in the logs for
>>>>>>> TM that was lost. Note: I have looked at many such failures and see the
>>>>>>> same below pattern.
>>>>>>>
>>>>>>> The JM logs above had most of what I had, but the below is what I
>>>>>>> have when I search for flink.yarn (we have huge logs otherwise, given the
>>>>>>> amount of SQL queries we run). The gist is Akka detecs unreachable, TM
>>>>>>> marked lost and unregistered by JM, operators start failing with
>>>>>>> NoResourceAvailableException since there was one less TM, 5 retry attempts
>>>>>>> later job goes down.
>>>>>>>
>>>>>>> ………….
>>>>>>>
>>>>>>> 2018-08-29 23:02:41,216 INFO
>>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>>>>> TaskManager container_e27_1535135887442_0906_01_000124 has started.
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,095 INFO
>>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>>>>> TaskManager container_e27_1535135887442_0906_01_000159 has started.
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Submitting job
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Using restart strategy
>>>>>>> FixedDelayRestartStrategy(maxNumberRestartAttempts=5,
>>>>>>> delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Running initialization on master for job
>>>>>>> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Successfully ran initialization on master
>>>>>>> in 0 ms.
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Using application-defined state backend
>>>>>>> for checkpoint/savepoint metadata: File State Backend @
>>>>>>> hdfs://security-temp/savedSearches/checkpoint.
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Scheduling job
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).
>>>>>>>
>>>>>>> 2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Attempting to recover all jobs.
>>>>>>>
>>>>>>> 2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - There are 1 jobs to recover. Starting the
>>>>>>> job recovery.
>>>>>>>
>>>>>>> 2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Attempting to recover job
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>>>>
>>>>>>> 2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Ignoring job recovery for
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.
>>>>>>>
>>>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Task manager akka.tcp://flink@
>>>>>>> blahabc.sfdc.net:123/user/taskmanager terminated.
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>
>>>>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Received message for non-existing
>>>>>>> checkpoint 1
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Job with ID
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down
>>>>>>> session
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Stopping JobManager with final
>>>>>>> application status FAILED and diagnostics: The monitored job with ID
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,129 INFO
>>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>>>>> Shutting down cluster with status FAILED : The monitored job with ID
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,146 INFO
>>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>>>>> Unregistering application from the YARN Resource Manager
>>>>>>>
>>>>>>> 2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Deleting yarn application files under
>>>>>>> hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.
>>>>>>>
>>>>>>> 2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Stopping JobManager akka.tcp://flink@
>>>>>>> blahxyz.sfdc.net <http://blahabc.sfdc.net/>:1235/user/jobmanager.
>>>>>>>
>>>>>>> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Actor system shut down timed out.
>>>>>>>
>>>>>>> 2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Shutdown completed. Stopping JVM.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <trohrmann@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi Subramanya,
>>>>>>>>
>>>>>>>> in order to help you I need a little bit of context. Which version
>>>>>>>> of Flink are you running? The configuration yarn.reallocate-failed is
>>>>>>>> deprecated since version Flink 1.1 and does not have an effect anymore.
>>>>>>>>
>>>>>>>> What would be helpful is to get the full jobmanager log from you.
>>>>>>>> If the YarnFlinkResourceManager gets notified that a container has failed,
>>>>>>>> it should restart this container (it will do this 145 times). So if the
>>>>>>>> YarnFlinkResourceManager does not get notified about a completed container,
>>>>>>>> then this might indicate that the container is still running. So it would
>>>>>>>> be good to check what the logs of container_e27_1535135887442_0906_01_000039
>>>>>>>> say.
>>>>>>>>
>>>>>>>> Moreover, do you see the same problem occur when using the latest
>>>>>>>> release Flink 1.6.0?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <
>>>>>>>> ssuresh@salesforce.com> wrote:
>>>>>>>>
>>>>>>>>> Hi, we are seeing a weird issue where one TaskManager is lost and
>>>>>>>>> then never re-allocated and subsequently operators fail with
>>>>>>>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay
>>>>>>>>> restarts of 5) the application goes down.
>>>>>>>>>
>>>>>>>>>    - We have explicitly set *yarn.reallocate-failed: *true and
>>>>>>>>>    have not specified the yarn.maximum-failed-containers (and see
>>>>>>>>>    “org.apache.flink.yarn.YarnApplicationMasterRunner             -
>>>>>>>>>    YARN application tolerates 145 failed TaskManager containers before giving
>>>>>>>>>    up” in the logs).
>>>>>>>>>    - After the initial startup where all 145 TaskManagers are
>>>>>>>>>    requested I never see any logs saying “Requesting new TaskManager
>>>>>>>>>    container” to reallocate failed container.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>>>>>>>>>                           - Detected unreachable: [akka.tcp://
>>>>>>>>> flink@blahabc.sfdc.net:123]
>>>>>>>>>
>>>>>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>>                         - Task manager akka.tcp://
>>>>>>>>> flink@blahabc.sfdc.net:123/user/taskmanager terminated.
>>>>>>>>>
>>>>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
>>>>>>>>> (dataPort=124)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>>>>>>>
>>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>>>>>>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>>>>>>>>>
>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>>>
>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>>>>>>>
>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>>>>>>>>
>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>>>>>>>>
>>>>>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>>>>>
>>>>>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>>>>>
>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>>>>>
>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>>>>
>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>>>>
>>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>>>
>>>>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>>>>> container_e27_1535135887442_0906_01_000039 @
>>>>>>>>> blahabc.sfdc.net:42414 (dataPort=124)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>>>>>>>
>>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>>>>>>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>>>>>>>>>
>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>>>
>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>>>>>>>
>>>>>>>>>

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Posted by Subramanya Suresh <ss...@salesforce.com>.
Hi Till,
Update on this. We are still weeding past 1.6.0 setup and run. In a
separate thread, we are running into issues, and sense more on the horizon
before we get it working.
We are under some tight timelines, so want to ask how confident you are
that the above would be fixed in 1.6.0 ? and that trying to fix it in 1.4.2
is the longer route.

Sincerely,

On Wed, Sep 12, 2018 at 6:49 PM, Subramanya Suresh <ss...@salesforce.com>
wrote:

> Hi Till,
> *After taskmanager.exit-on-fatal-akka-error: true` *
> I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]
> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not
> sure if that is a coincidence or a direct impact of this change.
>
> Quick update, I can confirm our issue still happens even with the flag
> being true. With a proper restart strategy (rate based, that gives it
> enough time) it can recover from container failures like the first case
> below, but not able to recover from "detected unreachable" issues like the
> second case below.
>
> We are currently using the below configuration. So I guess the only
> options left are to increase the heartbeat.pause or move to 1.6 as you
> suggested
>
>
> akka.client.timeout: 600s
> akka.ask.timeout: 600s
> akka.lookup.timeout: 600s
> akka.watch.heartbeat.pause: 120s
>
>
> ___________________________________________________
> 8-09-11 16:30:44,861 INFO org.apache.flink.yarn.YarnFlinkResourceManager
> - Container container_e29_1536261974019_1134_01_000036 failed. Exit
> status: -100
> 2018-09-11 16 <2018091116>:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager
> - Diagnostics for container container_e29_1536261974019_1134_01_000036 in
> state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost*
> node
> 2018-09-11 16 <2018091116>:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager
> - Total number of failed containers so far: 1
> 2018-09-11 16 <2018091116>:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager
> - Requesting new TaskManager container with 20000 megabytes memory. Pending
> requests: 1
> 2018-09-11 16 <2018091116>:30:44,862 INFO org.apache.flink.yarn.YarnJobManager
> - Task manager akka.tcp://flink@hellow-world2-13:41157/user/taskmanager
> terminated.
> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
> applyOrElse(YarnJobManager.scala:110)
> 2018-09-11 16 <2018091116>:30:44,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Job streaming-searches-prod (1af29a616cba4bd76f920f7c80189535) switched
> from state RUNNING to FAILING.
> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
> applyOrElse(YarnJobManager.scala:110)
> 2018-09-11 16 <2018091116>:30:49,700 WARN akka.remote.ReliableDeliverySupervisor
> - Association with remote system [akka.tcp://flink@hello-world2-13:41157]
> has failed, address is now gated for [5000] ms. Reason: [Disassociated].
> This container 2-13, just has a received
> 2018-09-11 16 <2018091116>:30:47,195 INFO org.apache.flink.yarn.YarnTaskManagerRunnerFactory
> - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
> ___________________________________________________
> But then failed with the same old
> 018-09-11 16:42:58,395 WARN akka.remote.RemoteWatcher - Detected
> unreachable: [akka.tcp://flink@hello-world3-3:44607]
> 2018-09-11 16 <2018091116>:42:58,409 INFO org.apache.flink.yarn.YarnJobManager
> - Task manager akka.tcp://flink@hello-world3-3/user/taskmanager
> terminated.
> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
> applyOrElse(YarnJobManager.scala:110)
> ___________________________________________________
>
> On Thu, Sep 6, 2018 at 9:08 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Subramanya,
>>
>> if the container is still running and the TM can simply not connect to
>> the JobManager, then the ResourceManager does not see a problem. The RM
>> things in terms of containers and as long as n containers are running, it
>> won't start new ones. That's the reason why the TM should exit in order to
>> terminate the container.
>>
>> Have you tried using a newer Flink version? Lately we have reworked a
>> good part of Flink's distributed architecture and added resource elasticity
>> (starting with Flink 1.5).
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <ss...@salesforce.com>
>> wrote:
>>
>>> Hi,
>>> With some additional research,
>>>
>>> *Before the flag*
>>> I realized for failed containers (that exited for a specific  we still
>>> were Requesting new TM container and launching TM). But for the "Detected
>>> unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]" issue I do not
>>> see the container marked as failed and a subsequent request for TM.
>>>
>>> *After taskmanager.exit-on-fatal-akka-error: true` *
>>> I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]
>>> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not
>>> sure if that is a coincidence or a direct impact of this change.
>>>
>>> *Our Issue:*
>>> I realized we are still exiting the application, i.e. failing when the
>>> containers are lost. The reason for this is before
>>> org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new
>>> container TM, launch TM and it is reported as started, the
>>> org.apache.flink.runtime.jobmanager.scheduler throws a
>>> NoResourceAvailableException that causes a failure. In our case we had
>>> fixed restart strategy with 5, and we are running out of it because of
>>> this. I am looking to solve this with a FailureRateRestartStrategy over 2
>>> minutes interval (10 second restart delay, >12 failures), that lets the TM
>>> come back (takes about 50 seconds).
>>>
>>> *Flink Bug*
>>> But I cannot help but think why there is no interaction between the
>>> ResourceManager and JobManager, i.e. why is the jobmanager continuing with
>>> the processing despite not having the required TMs ?
>>>
>>> Logs to substantiate what I said above (only relevant).
>>>
>>> 018-09-03 06:17:13,932 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Container container_e28_1535135887442_1381_01_000097
>>> completed successfully with diagnostics: Container released by application
>>>
>>>
>>> 2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor
>>>                     - Association with remote system [akka.tcp://
>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>> now gated for [5000] ms. Reason: [Disassociated]
>>> 2018-09-03 06:34:19,214 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Container container_e27_1535135887442_1381_01_000102
>>> failed. Exit status: -102
>>> 2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Diagnostics for container container_e27_1535135887442_1381_01_000102
>>> in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by
>>> scheduler
>>> 2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Total number of failed containers so far: 1
>>> 2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Requesting new TaskManager container with 20000 megabytes
>>> memory. Pending requests: 1
>>> 2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Task manager akka.tcp://flink@hello-world9-
>>> 27-crz.ops.sfdc.net:46219/user/taskmanager terminated.
>>> 2018-09-03 06:34:19,218 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>> switched from state RUNNING to FAILING.
>>> java.lang.Exception: TaskManager was lost/killed:
>>> container_e27_1535135887442_1381_01_000102 @
>>> hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
>>> 2018-09-03 06:34:19,466 INFO  org.apache.flink.runtime.instance.InstanceManager
>>>            - Unregistered task manager hello-world9-27-crz.ops.sfdc.n
>>> et/11.11.35.220. Number of registered task managers 144. Number of
>>> available slots 720
>>>
>>> 2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Received new container: container_e28_1535135887442_1381_01_000147
>>> - Remaining pending container requests: 0
>>> 2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Launching TaskManager in container ContainerInLaunch @
>>> 1535956464717: Container: [ContainerId: container_e28_1535135887442_1381_01_000147,
>>> NodeId: hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress:
>>> hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480,
>>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
>>> 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
>>> 2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor
>>>                     - Association with remote system [akka.tcp://
>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>> 2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport
>>>                   - Remote connection to [null] failed with
>>> java.net.ConnectException: Connection refused:
>>> hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
>>> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>>>                     - Association with remote system [akka.tcp://
>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>> 2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>> switched from state RESTARTING to CREATED.
>>>
>>> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>>>                     - Association with remote system [akka.tcp://
>>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>> 2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>> switched from state RESTARTING to CREATED.
>>> 2018-09-03 06:34:35,044 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>> switched from state CREATED to RUNNING.
>>> 2018-09-03 06:34:35,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>> switched from state RUNNING to FAILING.
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Not enough free slots available to run the job. You can decrease the
>>> operator parallelism or increase the number of slots per TaskManager in the
>>> configuration. Task to schedule: < Attempt #3 (Source: Custom Source ->
>>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
>>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
>>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'),
>>> =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch),
>>> _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'),
>>> cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS
>>> NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF
>>>
>>>
>>>
>>>
>>> 2018-09-03 06:34:39,248 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - TaskManager container_e28_1535135887442_1381_01_000147
>>> has started.
>>> 2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Container container_e28_1535135887442_1381_01_000147
>>> failed. Exit status: -102
>>> 2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Diagnostics for container container_e28_1535135887442_1381_01_000147
>>> in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by
>>> scheduler
>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Task manager akka.tcp://flink@hello-world9-
>>> 27-crz.ops.sfdc.net:41966/user/taskmanager terminated.
>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Total number of failed containers so far: 2
>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.runtime.instance.InstanceManager
>>>            - Unregistered task manager hello-world9-27-crz.ops.sfdc.n
>>> et/11.11.35.220. Number of registered task managers 144. Number of
>>> available slots 720.
>>>
>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Received new container: container_e28_1535135887442_1381_01_000202
>>> - Remaining pending container requests: 0
>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Launching TaskManager in container ContainerInLaunch @
>>> 1535956485236: Container: [ContainerId: container_e28_1535135887442_1381_01_000202,
>>> NodeId: hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress:
>>> hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480,
>>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
>>> 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
>>> 2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Received new container: container_e28_1535135887442_1381_01_000203
>>> - Remaining pending container requests: 0
>>> 2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Returning excess container container_e28_1535135887442_13
>>> 81_01_000203
>>>
>>> Notice there is no TaskManager container_e28_1535135887442_1381_01_000202
>>> has started.
>>> I see
>>> 2018-09-03 06:34:56,894 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>>> switched from state FAILING to FAILED.
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Not enough free slots available to run the job. You can decrease the
>>> operator parallelism or increase the number of slots per TaskManager in the
>>> configuration. Task to schedule: < Attempt #5 (Source: Custom Source ->
>>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
>>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
>>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R
>>>
>>>
>>> 2018-09-03 06:34:57,005 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Shutting down cluster with status FAILED : The monitored
>>> job with ID 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
>>> 2018-09-03 06:34:57,007 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>>               - Unregistering application from the YARN Resource Manager
>>>
>>>
>>>
>>>
>>> On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <
>>> ssuresh@salesforce.com> wrote:
>>>
>>>> Thanks TIll,
>>>> I do not see any Akka related messages in that Taskmanager after the
>>>> initial startup. It seemed like all is well. So after the remotewatcher
>>>> detects it unreachable and the TaskManager unregisters it, I do not see any
>>>> other activity in the JobManager with regards to reallocation etc.
>>>> - Does the quarantining of the TaskManager not happen until  the
>>>> exit-on-fatal-akka-error is turned on ?
>>>> - Does the JobManager or the TaskManager try to reconnect to each other
>>>> again ? Is there a different setting for it ?
>>>> - Does the JobManager not reallocate a TaskManager despite it being
>>>> unregistered, until the TaskManager exits ? I think it should, especially
>>>> if it is not trying to establish a connection again.
>>>>
>>>> I will give the flag a try.
>>>>
>>>> Sincerely,
>>>>
>>>>
>>>> On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Could you check whether akka.tcp://flink@blaha
>>>>> bc.sfdc.net:123/user/taskmanager is still running? E.g. tries to
>>>>> reconnect to the JobManager? If this is the case, then the container is
>>>>> still running and the YarnFlinkResourceManager thinks that everything is
>>>>> alright. You can activate that a TaskManager kills itself if it gets
>>>>> quarantined by setting `taskmanager.exit-on-fatal-akka-error: true`
>>>>> in the `flink-conf.yaml`.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <
>>>>> ssuresh@salesforce.com> wrote:
>>>>>
>>>>>> Hi Till,
>>>>>> Greatly appreciate your reply.
>>>>>> We use version 1.4.2. I do not see nothing unusual in the logs for TM
>>>>>> that was lost. Note: I have looked at many such failures and see the same
>>>>>> below pattern.
>>>>>>
>>>>>> The JM logs above had most of what I had, but the below is what I
>>>>>> have when I search for flink.yarn (we have huge logs otherwise, given the
>>>>>> amount of SQL queries we run). The gist is Akka detecs unreachable, TM
>>>>>> marked lost and unregistered by JM, operators start failing with
>>>>>> NoResourceAvailableException since there was one less TM, 5 retry attempts
>>>>>> later job goes down.
>>>>>>
>>>>>> ………….
>>>>>>
>>>>>> 2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlin
>>>>>> kResourceManager                - TaskManager
>>>>>> container_e27_1535135887442_0906_01_000124 has started.
>>>>>>
>>>>>> 2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlin
>>>>>> kResourceManager                - TaskManager
>>>>>> container_e27_1535135887442_0906_01_000159 has started.
>>>>>>
>>>>>> 2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Submitting job
>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).
>>>>>>
>>>>>> 2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Using restart strategy
>>>>>> FixedDelayRestartStrategy(maxNumberRestartAttempts=5,
>>>>>> delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb03
>>>>>> 35.
>>>>>>
>>>>>> 2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Running initialization on master for job
>>>>>> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).
>>>>>>
>>>>>> 2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Successfully ran initialization on master
>>>>>> in 0 ms.
>>>>>>
>>>>>> 2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Using application-defined state backend
>>>>>> for checkpoint/savepoint metadata: File State Backend @
>>>>>> hdfs://security-temp/savedSearches/checkpoint.
>>>>>>
>>>>>> 2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Scheduling job
>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).
>>>>>>
>>>>>> 2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Attempting to recover all jobs.
>>>>>>
>>>>>> 2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - There are 1 jobs to recover. Starting the
>>>>>> job recovery.
>>>>>>
>>>>>> 2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Attempting to recover job
>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>>>
>>>>>> 2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Ignoring job recovery for
>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.
>>>>>>
>>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Task manager akka.tcp://flink@blahabc.sfdc.
>>>>>> net:123/user/taskmanager terminated.
>>>>>>
>>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShut
>>>>>> down$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>
>>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShut
>>>>>> down$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>
>>>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Received message for non-existing
>>>>>> checkpoint 1
>>>>>>
>>>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>>> is in terminal state FAILED. Shutting down session
>>>>>>
>>>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Stopping JobManager with final application
>>>>>> status FAILED and diagnostics: The monitored job with ID
>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>>
>>>>>> 2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlin
>>>>>> kResourceManager                - Shutting down cluster with status
>>>>>> FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>>> has failed to complete.
>>>>>>
>>>>>> 2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlin
>>>>>> kResourceManager                - Unregistering application from the
>>>>>> YARN Resource Manager
>>>>>>
>>>>>> 2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Deleting yarn application files under
>>>>>> hdfs://security-temp/user/sec-data-app/.flink/application_15
>>>>>> 35135887442_0906.
>>>>>>
>>>>>> 2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Stopping JobManager akka.tcp://flink@
>>>>>> blahxyz.sfdc.net <http://blahabc.sfdc.net/>:1235/user/jobmanager.
>>>>>>
>>>>>> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Actor system shut down timed out.
>>>>>>
>>>>>> 2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Shutdown completed. Stopping JVM.
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Subramanya,
>>>>>>>
>>>>>>> in order to help you I need a little bit of context. Which version
>>>>>>> of Flink are you running? The configuration yarn.reallocate-failed is
>>>>>>> deprecated since version Flink 1.1 and does not have an effect anymore.
>>>>>>>
>>>>>>> What would be helpful is to get the full jobmanager log from you. If
>>>>>>> the YarnFlinkResourceManager gets notified that a container has failed, it
>>>>>>> should restart this container (it will do this 145 times). So if the
>>>>>>> YarnFlinkResourceManager does not get notified about a completed container,
>>>>>>> then this might indicate that the container is still running. So it would
>>>>>>> be good to check what the logs of container_e27_1535135887442_0906_01_000039
>>>>>>> say.
>>>>>>>
>>>>>>> Moreover, do you see the same problem occur when using the latest
>>>>>>> release Flink 1.6.0?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <
>>>>>>> ssuresh@salesforce.com> wrote:
>>>>>>>
>>>>>>>> Hi, we are seeing a weird issue where one TaskManager is lost and
>>>>>>>> then never re-allocated and subsequently operators fail with
>>>>>>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay
>>>>>>>> restarts of 5) the application goes down.
>>>>>>>>
>>>>>>>>    - We have explicitly set *yarn.reallocate-failed: *true and
>>>>>>>>    have not specified the yarn.maximum-failed-containers (and see
>>>>>>>>    “org.apache.flink.yarn.YarnApplicationMasterRunner             -
>>>>>>>>    YARN application tolerates 145 failed TaskManager containers before giving
>>>>>>>>    up” in the logs).
>>>>>>>>    - After the initial startup where all 145 TaskManagers are
>>>>>>>>    requested I never see any logs saying “Requesting new TaskManager
>>>>>>>>    container” to reallocate failed container.
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>>>>>>>>                           - Detected unreachable: [akka.tcp://
>>>>>>>> flink@blahabc.sfdc.net:123]
>>>>>>>>
>>>>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Task manager akka.tcp://
>>>>>>>> flink@blahabc.sfdc.net:123/user/taskmanager terminated.
>>>>>>>>
>>>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
>>>>>>>> (dataPort=124)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(
>>>>>>>> SimpleSlot.java:217)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment
>>>>>>>> .releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(
>>>>>>>> SharedSlot.java:192)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.instance.Instance.markDead(Instance
>>>>>>>> .java:167)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.instance.InstanceManager.unregister
>>>>>>>> TaskManager(InstanceManager.java:212)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$
>>>>>>>> flink$runtime$jobmanager$JobManager$$handleTaskManagerT
>>>>>>>> erminated(JobManager.scala:1198)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>>>>>>>> handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>>>
>>>>>>>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>>>>>>>> unction.scala:36)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.clusterframework.ContaineredJobMana
>>>>>>>> ger$$anonfun$handleContainerMessage$1.applyOrElse(Containere
>>>>>>>> dJobManager.scala:107)
>>>>>>>>
>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>
>>>>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShut
>>>>>>>> down$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>>
>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun
>>>>>>>> $receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>>>>
>>>>>>>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>>>>>>>> unction.scala:36)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>>>>>>>> es.scala:33)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>>>>>>>> es.scala:28)
>>>>>>>>
>>>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scal
>>>>>>>> a:123)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(
>>>>>>>> LogMessages.scala:28)
>>>>>>>>
>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive
>>>>>>>> (JobManager.scala:122)
>>>>>>>>
>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>>>
>>>>>>>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(Death
>>>>>>>> Watch.scala:46)
>>>>>>>>
>>>>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>>>>
>>>>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>>>>
>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>>>>
>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>>>
>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>>>
>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>>>
>>>>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>>>>>>>> java:260)
>>>>>>>>
>>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>>> ForkJoinPool.java:1339)
>>>>>>>>
>>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>>>>>>> l.java:1979)
>>>>>>>>
>>>>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>>>>>>> orkerThread.java:107)
>>>>>>>>
>>>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414
>>>>>>>> (dataPort=124)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(
>>>>>>>> SimpleSlot.java:217)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment
>>>>>>>> .releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(
>>>>>>>> SharedSlot.java:192)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.instance.Instance.markDead(Instance
>>>>>>>> .java:167)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.instance.InstanceManager.unregister
>>>>>>>> TaskManager(InstanceManager.java:212)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$
>>>>>>>> flink$runtime$jobmanager$JobManager$$handleTaskManagerT
>>>>>>>> erminated(JobManager.scala:1198)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>>>>>>>> handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>>>
>>>>>>>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>>>>>>>> unction.scala:36)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.clusterframework.ContaineredJobMana
>>>>>>>> ger$$anonfun$handleContainerMessage$1.applyOrElse(Containere
>>>>>>>> dJobManager.scala:107)
>>>>>>>>
>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>
>>>>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShut
>>>>>>>> down$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>>
>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun
>>>>>>>> $receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>>>>
>>>>>>>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>>>>>>>> unction.scala:36)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>>>>>>>> es.scala:33)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>>>>>>>> es.scala:28)
>>>>>>>>
>>>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scal
>>>>>>>> a:123)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(
>>>>>>>> LogMessages.scala:28)
>>>>>>>>
>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>>>
>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive
>>>>>>>> (JobManager.scala:122)
>>>>>>>>
>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>>>
>>>>>>>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(Death
>>>>>>>> Watch.scala:46)
>>>>>>>>
>>>>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>>>>
>>>>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>>>>
>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>>>>
>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>>>
>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>>>
>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>>>
>>>>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>>>>>>>> java:260)
>>>>>>>>
>>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>>> ForkJoinPool.java:1339)
>>>>>>>>
>>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>>>>>>> l.java:1979)
>>>>>>>>
>>>>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>>>>>>> orkerThread.java:107)
>>>>>>>>
>>>>>>>> 2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager
>>>>>>>>             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1.
>>>>>>>> Number of registered task managers 144. Number of available slots 720.
>>>>>>>>
>>>>>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Received message for non-existing
>>>>>>>> checkpoint 1
>>>>>>>>
>>>>>>>> 2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.check
>>>>>>>> point.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints
>>>>>>>> from ZooKeeper.
>>>>>>>>
>>>>>>>> 2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.check
>>>>>>>> point.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in
>>>>>>>> ZooKeeper.
>>>>>>>>
>>>>>>>> 2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.check
>>>>>>>> point.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0
>>>>>>>> checkpoints from storage.
>>>>>>>>
>>>>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>>>>> Not enough free slots available to run the job. You can decrease the
>>>>>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>>>>>> configuration. Task to schedule: < Attempt #1
>>>>>>>>
>>>>>>>>
>>>>>>>> *After 5 retries of our Sql query execution graph (we have
>>>>>>>> configured 5 fixed delay restart), it outputs the below, *
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>>     - Stopping checkpoint coordinator for job
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.check
>>>>>>>> point.ZooKeeperCompletedCheckpointStore  - Shutting down
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.check
>>>>>>>> point.ZooKeeperCompletedCheckpointStore  - Removing
>>>>>>>> /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>>>>> from ZooKeeper
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.check
>>>>>>>> point.ZooKeeperCheckpointIDCounter  - Shutting down.
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.check
>>>>>>>> point.ZooKeeperCheckpointIDCounter  - Removing
>>>>>>>> /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobma
>>>>>>>> nager.ZooKeeperSubmittedJobGraphStore  - Removed job graph
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Job with ID
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED.
>>>>>>>> Shutting down session
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>                         - Stopping JobManager with final
>>>>>>>> application status FAILED and diagnostics: The monitored job with ID
>>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>>>>
>>>>>>>> 2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlin
>>>>>>>> kResourceManager
>>>>>>>>
>>>>>>>
>
>
> --
>
> <http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>
>



-- 

<http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Posted by Subramanya Suresh <ss...@salesforce.com>.
Hi Till,
*After taskmanager.exit-on-fatal-akka-error: true` *
I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123] since
we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if
that is a coincidence or a direct impact of this change.

Quick update, I can confirm our issue still happens even with the flag
being true. With a proper restart strategy (rate based, that gives it
enough time) it can recover from container failures like the first case
below, but not able to recover from "detected unreachable" issues like the
second case below.

We are currently using the below configuration. So I guess the only options
left are to increase the heartbeat.pause or move to 1.6 as you suggested


akka.client.timeout: 600s
akka.ask.timeout: 600s
akka.lookup.timeout: 600s
akka.watch.heartbeat.pause: 120s


___________________________________________________
8-09-11 16:30:44,861 INFO org.apache.flink.yarn.YarnFlinkResourceManager -
Container container_e29_1536261974019_1134_01_000036 failed. Exit status:
-100
2018-09-11 16 <2018091116>:30:44,862 INFO
org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container
container_e29_1536261974019_1134_01_000036 in state COMPLETE :
exitStatus=-100 diagnostics=Container released on a *lost* node
2018-09-11 16 <2018091116>:30:44,862 INFO
org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed
containers so far: 1
2018-09-11 16 <2018091116>:30:44,862 INFO
org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager
container with 20000 megabytes memory. Pending requests: 1
2018-09-11 16 <2018091116>:30:44,862 INFO
org.apache.flink.yarn.YarnJobManager - Task manager
akka.tcp://flink@hellow-world2-13:41157/user/taskmanager terminated.
at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
2018-09-11 16 <2018091116>:30:44,868 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
streaming-searches-prod (1af29a616cba4bd76f920f7c80189535) switched from
state RUNNING to FAILING.
at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
2018-09-11 16 <2018091116>:30:49,700 WARN
akka.remote.ReliableDeliverySupervisor - Association with remote system
[akka.tcp://flink@hello-world2-13:41157] has failed, address is now gated
for [5000] ms. Reason: [Disassociated].
This container 2-13, just has a received
2018-09-11 16 <2018091116>:30:47,195 INFO
org.apache.flink.yarn.YarnTaskManagerRunnerFactory - RECEIVED SIGNAL 15:
SIGTERM. Shutting down as requested.
___________________________________________________
But then failed with the same old
018-09-11 16:42:58,395 WARN akka.remote.RemoteWatcher - Detected
unreachable: [akka.tcp://flink@hello-world3-3:44607]
2018-09-11 16 <2018091116>:42:58,409 INFO
org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@
hello-world3-3/user/taskmanager terminated.
at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
___________________________________________________

On Thu, Sep 6, 2018 at 9:08 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Subramanya,
>
> if the container is still running and the TM can simply not connect to the
> JobManager, then the ResourceManager does not see a problem. The RM things
> in terms of containers and as long as n containers are running, it won't
> start new ones. That's the reason why the TM should exit in order to
> terminate the container.
>
> Have you tried using a newer Flink version? Lately we have reworked a good
> part of Flink's distributed architecture and added resource elasticity
> (starting with Flink 1.5).
>
> Cheers,
> Till
>
> On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <ss...@salesforce.com>
> wrote:
>
>> Hi,
>> With some additional research,
>>
>> *Before the flag*
>> I realized for failed containers (that exited for a specific  we still
>> were Requesting new TM container and launching TM). But for the "Detected
>> unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]" issue I do not see
>> the container marked as failed and a subsequent request for TM.
>>
>> *After taskmanager.exit-on-fatal-akka-error: true` *
>> I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]
>> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not
>> sure if that is a coincidence or a direct impact of this change.
>>
>> *Our Issue:*
>> I realized we are still exiting the application, i.e. failing when the
>> containers are lost. The reason for this is before org.apache.flink.yarn.YarnFlinkResouceManager
>> is able to acquire a new container TM, launch TM and it is reported as
>> started, the org.apache.flink.runtime.jobmanager.scheduler throws a
>> NoResourceAvailableException that causes a failure. In our case we had
>> fixed restart strategy with 5, and we are running out of it because of
>> this. I am looking to solve this with a FailureRateRestartStrategy over 2
>> minutes interval (10 second restart delay, >12 failures), that lets the TM
>> come back (takes about 50 seconds).
>>
>> *Flink Bug*
>> But I cannot help but think why there is no interaction between the
>> ResourceManager and JobManager, i.e. why is the jobmanager continuing with
>> the processing despite not having the required TMs ?
>>
>> Logs to substantiate what I said above (only relevant).
>>
>> 018-09-03 06:17:13,932 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Container container_e28_1535135887442_1381_01_000097
>> completed successfully with diagnostics: Container released by application
>>
>>
>> 2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor
>>                     - Association with remote system [akka.tcp://
>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
>> gated for [5000] ms. Reason: [Disassociated]
>> 2018-09-03 06:34:19,214 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Container container_e27_1535135887442_1381_01_000102
>> failed. Exit status: -102
>> 2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Diagnostics for container container_e27_1535135887442_1381_01_000102
>> in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by
>> scheduler
>> 2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Total number of failed containers so far: 1
>> 2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Requesting new TaskManager container with 20000 megabytes
>> memory. Pending requests: 1
>> 2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Task manager akka.tcp://flink@hello-world9-
>> 27-crz.ops.sfdc.net:46219/user/taskmanager terminated.
>> 2018-09-03 06:34:19,218 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>> switched from state RUNNING to FAILING.
>> java.lang.Exception: TaskManager was lost/killed:
>> container_e27_1535135887442_1381_01_000102 @
>> hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
>> 2018-09-03 06:34:19,466 INFO  org.apache.flink.runtime.instance.InstanceManager
>>            - Unregistered task manager hello-world9-27-crz.ops.sfdc.
>> net/11.11.35.220. Number of registered task managers 144. Number of
>> available slots 720
>>
>> 2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Received new container: container_e28_1535135887442_1381_01_000147
>> - Remaining pending container requests: 0
>> 2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Launching TaskManager in container ContainerInLaunch @
>> 1535956464717: Container: [ContainerId: container_e28_1535135887442_1381_01_000147,
>> NodeId: hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress:
>> hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480,
>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
>> 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
>> 2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor
>>                     - Association with remote system [akka.tcp://
>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
>> gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>> 2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport
>>                   - Remote connection to [null] failed with
>> java.net.ConnectException: Connection refused:
>> hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
>> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>>                     - Association with remote system [akka.tcp://
>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
>> gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>> 2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>> switched from state RESTARTING to CREATED.
>>
>> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>>                     - Association with remote system [akka.tcp://
>> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
>> gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>> 2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>> switched from state RESTARTING to CREATED.
>> 2018-09-03 06:34:35,044 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>> switched from state CREATED to RUNNING.
>> 2018-09-03 06:34:35,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>> switched from state RUNNING to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Not enough free slots available to run the job. You can decrease the
>> operator parallelism or increase the number of slots per TaskManager in the
>> configuration. Task to schedule: < Attempt #3 (Source: Custom Source ->
>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'),
>> =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch),
>> _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'),
>> cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS
>> NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF
>>
>>
>>
>>
>> 2018-09-03 06:34:39,248 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - TaskManager container_e28_1535135887442_1381_01_000147
>> has started.
>> 2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Container container_e28_1535135887442_1381_01_000147
>> failed. Exit status: -102
>> 2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Diagnostics for container container_e28_1535135887442_1381_01_000147
>> in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by
>> scheduler
>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Task manager akka.tcp://flink@hello-world9-
>> 27-crz.ops.sfdc.net:41966/user/taskmanager terminated.
>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Total number of failed containers so far: 2
>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.runtime.instance.InstanceManager
>>            - Unregistered task manager hello-world9-27-crz.ops.sfdc.
>> net/11.11.35.220. Number of registered task managers 144. Number of
>> available slots 720.
>>
>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Received new container: container_e28_1535135887442_1381_01_000202
>> - Remaining pending container requests: 0
>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Launching TaskManager in container ContainerInLaunch @
>> 1535956485236: Container: [ContainerId: container_e28_1535135887442_1381_01_000202,
>> NodeId: hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress:
>> hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480,
>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
>> 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
>> 2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Received new container: container_e28_1535135887442_1381_01_000203
>> - Remaining pending container requests: 0
>> 2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Returning excess container container_e28_1535135887442_
>> 1381_01_000203
>>
>> Notice there is no TaskManager container_e28_1535135887442_1381_01_000202
>> has started.
>> I see
>> 2018-09-03 06:34:56,894 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981)
>> switched from state FAILING to FAILED.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Not enough free slots available to run the job. You can decrease the
>> operator parallelism or increase the number of slots per TaskManager in the
>> configuration. Task to schedule: < Attempt #5 (Source: Custom Source ->
>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R
>>
>>
>> 2018-09-03 06:34:57,005 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Shutting down cluster with status FAILED : The monitored
>> job with ID 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
>> 2018-09-03 06:34:57,007 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>>               - Unregistering application from the YARN Resource Manager
>>
>>
>>
>>
>> On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <
>> ssuresh@salesforce.com> wrote:
>>
>>> Thanks TIll,
>>> I do not see any Akka related messages in that Taskmanager after the
>>> initial startup. It seemed like all is well. So after the remotewatcher
>>> detects it unreachable and the TaskManager unregisters it, I do not see any
>>> other activity in the JobManager with regards to reallocation etc.
>>> - Does the quarantining of the TaskManager not happen until  the
>>> exit-on-fatal-akka-error is turned on ?
>>> - Does the JobManager or the TaskManager try to reconnect to each other
>>> again ? Is there a different setting for it ?
>>> - Does the JobManager not reallocate a TaskManager despite it being
>>> unregistered, until the TaskManager exits ? I think it should, especially
>>> if it is not trying to establish a connection again.
>>>
>>> I will give the flag a try.
>>>
>>> Sincerely,
>>>
>>>
>>> On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Could you check whether akka.tcp://flink@blahabc.sfdc.net:123/user/
>>>> taskmanager is still running? E.g. tries to reconnect to the
>>>> JobManager? If this is the case, then the container is still running and
>>>> the YarnFlinkResourceManager thinks that everything is alright. You can
>>>> activate that a TaskManager kills itself if it gets quarantined by setting
>>>> `taskmanager.exit-on-fatal-akka-error: true` in the `flink-conf.yaml`.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <
>>>> ssuresh@salesforce.com> wrote:
>>>>
>>>>> Hi Till,
>>>>> Greatly appreciate your reply.
>>>>> We use version 1.4.2. I do not see nothing unusual in the logs for TM
>>>>> that was lost. Note: I have looked at many such failures and see the same
>>>>> below pattern.
>>>>>
>>>>> The JM logs above had most of what I had, but the below is what I have
>>>>> when I search for flink.yarn (we have huge logs otherwise, given the amount
>>>>> of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost
>>>>> and unregistered by JM, operators start failing with
>>>>> NoResourceAvailableException since there was one less TM, 5 retry attempts
>>>>> later job goes down.
>>>>>
>>>>> ………….
>>>>>
>>>>> 2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.
>>>>> YarnFlinkResourceManager                - TaskManager
>>>>> container_e27_1535135887442_0906_01_000124 has started.
>>>>>
>>>>> 2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.
>>>>> YarnFlinkResourceManager                - TaskManager
>>>>> container_e27_1535135887442_0906_01_000159 has started.
>>>>>
>>>>> 2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>> (streaming-searches-prod).
>>>>>
>>>>> 2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Using restart strategy
>>>>> FixedDelayRestartStrategy(maxNumberRestartAttempts=5,
>>>>> delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb03
>>>>> 35.
>>>>>
>>>>> 2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Running initialization on master for job
>>>>> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).
>>>>>
>>>>> 2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Successfully ran initialization on master in
>>>>> 0 ms.
>>>>>
>>>>> 2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Using application-defined state backend for
>>>>> checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/
>>>>> savedSearches/checkpoint.
>>>>>
>>>>> 2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>> (streaming-searches-prod).
>>>>>
>>>>> 2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Attempting to recover all jobs.
>>>>>
>>>>> 2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - There are 1 jobs to recover. Starting the job
>>>>> recovery.
>>>>>
>>>>> 2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Attempting to recover job
>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>>
>>>>> 2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Ignoring job recovery for
>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.
>>>>>
>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager
>>>>> terminated.
>>>>>
>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>>>>> applyOrElse(YarnJobManager.scala:110)
>>>>>
>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>>>>> applyOrElse(YarnJobManager.scala:110)
>>>>>
>>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Received message for non-existing checkpoint 1
>>>>>
>>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>> is in terminal state FAILED. Shutting down session
>>>>>
>>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Stopping JobManager with final application
>>>>> status FAILED and diagnostics: The monitored job with ID
>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>
>>>>> 2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.
>>>>> YarnFlinkResourceManager                - Shutting down cluster with
>>>>> status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>> has failed to complete.
>>>>>
>>>>> 2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.
>>>>> YarnFlinkResourceManager                - Unregistering application
>>>>> from the YARN Resource Manager
>>>>>
>>>>> 2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Deleting yarn application files under
>>>>> hdfs://security-temp/user/sec-data-app/.flink/application_
>>>>> 1535135887442_0906.
>>>>>
>>>>> 2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Stopping JobManager akka.tcp://flink@
>>>>> blahxyz.sfdc.net <http://blahabc.sfdc.net/>:1235/user/jobmanager.
>>>>>
>>>>> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager
>>>>>                       - Actor system shut down timed out.
>>>>>
>>>>> 2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Shutdown completed. Stopping JVM.
>>>>>
>>>>>
>>>>> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Subramanya,
>>>>>>
>>>>>> in order to help you I need a little bit of context. Which version of
>>>>>> Flink are you running? The configuration yarn.reallocate-failed is
>>>>>> deprecated since version Flink 1.1 and does not have an effect anymore.
>>>>>>
>>>>>> What would be helpful is to get the full jobmanager log from you. If
>>>>>> the YarnFlinkResourceManager gets notified that a container has failed, it
>>>>>> should restart this container (it will do this 145 times). So if the
>>>>>> YarnFlinkResourceManager does not get notified about a completed container,
>>>>>> then this might indicate that the container is still running. So it would
>>>>>> be good to check what the logs of container_e27_1535135887442_0906_01_000039
>>>>>> say.
>>>>>>
>>>>>> Moreover, do you see the same problem occur when using the latest
>>>>>> release Flink 1.6.0?
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <
>>>>>> ssuresh@salesforce.com> wrote:
>>>>>>
>>>>>>> Hi, we are seeing a weird issue where one TaskManager is lost and
>>>>>>> then never re-allocated and subsequently operators fail with
>>>>>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay
>>>>>>> restarts of 5) the application goes down.
>>>>>>>
>>>>>>>    - We have explicitly set *yarn.reallocate-failed: *true and have
>>>>>>>    not specified the yarn.maximum-failed-containers (and see
>>>>>>>    “org.apache.flink.yarn.YarnApplicationMasterRunner             -
>>>>>>>    YARN application tolerates 145 failed TaskManager containers before giving
>>>>>>>    up” in the logs).
>>>>>>>    - After the initial startup where all 145 TaskManagers are
>>>>>>>    requested I never see any logs saying “Requesting new TaskManager
>>>>>>>    container” to reallocate failed container.
>>>>>>>
>>>>>>>
>>>>>>> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>>>>>>>                         - Detected unreachable: [akka.tcp://
>>>>>>> flink@blahabc.sfdc.net:123]
>>>>>>>
>>>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Task manager akka.tcp://
>>>>>>> flink@blahabc.sfdc.net:123/user/taskmanager terminated.
>>>>>>>
>>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
>>>>>>> (dataPort=124)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.instance.SimpleSlot.
>>>>>>> releaseSlot(SimpleSlot.java:217)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.
>>>>>>> releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.instance.SharedSlot.
>>>>>>> releaseSlot(SharedSlot.java:192)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.instance.Instance.markDead(
>>>>>>> Instance.java:167)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.instance.InstanceManager.
>>>>>>> unregisterTaskManager(InstanceManager.java:212)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org$
>>>>>>> apache$flink$runtime$jobmanager$JobManager$$
>>>>>>> handleTaskManagerTerminated(JobManager.scala:1198)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>>>>> anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>>
>>>>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>>>>> AbstractPartialFunction.scala:36)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$
>>>>>>> anonfun$handleContainerMessage$1.applyOrElse(
>>>>>>> ContaineredJobManager.scala:107)
>>>>>>>
>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>
>>>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$
>>>>>>> handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>
>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
>>>>>>> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>>>
>>>>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>>>>> AbstractPartialFunction.scala:36)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>>>>> LogMessages.scala:33)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>>>>> LogMessages.scala:28)
>>>>>>>
>>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.
>>>>>>> scala:123)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.
>>>>>>> applyOrElse(LogMessages.scala:28)
>>>>>>>
>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.
>>>>>>> aroundReceive(JobManager.scala:122)
>>>>>>>
>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>>
>>>>>>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(
>>>>>>> DeathWatch.scala:46)
>>>>>>>
>>>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>>>
>>>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>>>
>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>>>
>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>>
>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>>
>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>>
>>>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>>>>>>> ForkJoinTask.java:260)
>>>>>>>
>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>>>> runTask(ForkJoinPool.java:1339)
>>>>>>>
>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>>>>> ForkJoinPool.java:1979)
>>>>>>>
>>>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>
>>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414
>>>>>>> (dataPort=124)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.instance.SimpleSlot.
>>>>>>> releaseSlot(SimpleSlot.java:217)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.
>>>>>>> releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.instance.SharedSlot.
>>>>>>> releaseSlot(SharedSlot.java:192)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.instance.Instance.markDead(
>>>>>>> Instance.java:167)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.instance.InstanceManager.
>>>>>>> unregisterTaskManager(InstanceManager.java:212)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org$
>>>>>>> apache$flink$runtime$jobmanager$JobManager$$
>>>>>>> handleTaskManagerTerminated(JobManager.scala:1198)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>>>>> anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>>
>>>>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>>>>> AbstractPartialFunction.scala:36)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$
>>>>>>> anonfun$handleContainerMessage$1.applyOrElse(
>>>>>>> ContaineredJobManager.scala:107)
>>>>>>>
>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>
>>>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$
>>>>>>> handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>
>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
>>>>>>> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>>>
>>>>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>>>>> AbstractPartialFunction.scala:36)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>>>>> LogMessages.scala:33)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>>>>> LogMessages.scala:28)
>>>>>>>
>>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.
>>>>>>> scala:123)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.
>>>>>>> applyOrElse(LogMessages.scala:28)
>>>>>>>
>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>>
>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.
>>>>>>> aroundReceive(JobManager.scala:122)
>>>>>>>
>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>>
>>>>>>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(
>>>>>>> DeathWatch.scala:46)
>>>>>>>
>>>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>>>
>>>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>>>
>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>>>
>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>>
>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>>
>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>>
>>>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>>>>>>> ForkJoinTask.java:260)
>>>>>>>
>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>>>> runTask(ForkJoinPool.java:1339)
>>>>>>>
>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>>>>> ForkJoinPool.java:1979)
>>>>>>>
>>>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>
>>>>>>> 2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager
>>>>>>>             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1.
>>>>>>> Number of registered task managers 144. Number of available slots 720.
>>>>>>>
>>>>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Received message for non-existing
>>>>>>> checkpoint 1
>>>>>>>
>>>>>>> 2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.
>>>>>>> ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from
>>>>>>> ZooKeeper.
>>>>>>>
>>>>>>> 2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.
>>>>>>> ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in
>>>>>>> ZooKeeper.
>>>>>>>
>>>>>>> 2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.
>>>>>>> ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints
>>>>>>> from storage.
>>>>>>>
>>>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>>>> Not enough free slots available to run the job. You can decrease the
>>>>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>>>>> configuration. Task to schedule: < Attempt #1
>>>>>>>
>>>>>>>
>>>>>>> *After 5 retries of our Sql query execution graph (we have
>>>>>>> configured 5 fixed delay restart), it outputs the below, *
>>>>>>>
>>>>>>>
>>>>>>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>     - Stopping checkpoint coordinator for job
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>>>>
>>>>>>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.
>>>>>>> ZooKeeperCompletedCheckpointStore  - Shutting down
>>>>>>>
>>>>>>> 2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.
>>>>>>> ZooKeeperCompletedCheckpointStore  - Removing
>>>>>>> /prod/link/application_1535135887442_0906/checkpoints/
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper
>>>>>>>
>>>>>>> 2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.
>>>>>>> ZooKeeperCheckpointIDCounter  - Shutting down.
>>>>>>>
>>>>>>> 2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.
>>>>>>> ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper
>>>>>>>
>>>>>>> 2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.
>>>>>>> ZooKeeperSubmittedJobGraphStore  - Removed job graph
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>>>> is in terminal state FAILED. Shutting down session
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Stopping JobManager with final
>>>>>>> application status FAILED and diagnostics: The monitored job with ID
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.
>>>>>>> YarnFlinkResourceManager
>>>>>>>
>>>>>>


-- 

<http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Posted by Till Rohrmann <tr...@apache.org>.
Hi Subramanya,

if the container is still running and the TM can simply not connect to the
JobManager, then the ResourceManager does not see a problem. The RM things
in terms of containers and as long as n containers are running, it won't
start new ones. That's the reason why the TM should exit in order to
terminate the container.

Have you tried using a newer Flink version? Lately we have reworked a good
part of Flink's distributed architecture and added resource elasticity
(starting with Flink 1.5).

Cheers,
Till

On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <ss...@salesforce.com>
wrote:

> Hi,
> With some additional research,
>
> *Before the flag*
> I realized for failed containers (that exited for a specific  we still
> were Requesting new TM container and launching TM). But for the "Detected
> unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]" issue I do not see
> the container marked as failed and a subsequent request for TM.
>
> *After taskmanager.exit-on-fatal-akka-error: true` *
> I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]
> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not
> sure if that is a coincidence or a direct impact of this change.
>
> *Our Issue:*
> I realized we are still exiting the application, i.e. failing when the
> containers are lost. The reason for this is before
> org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new
> container TM, launch TM and it is reported as started, the org.apache.flink.runtime.jobmanager.scheduler
> throws a NoResourceAvailableException that causes a failure. In our case we
> had fixed restart strategy with 5, and we are running out of it because of
> this. I am looking to solve this with a FailureRateRestartStrategy over 2
> minutes interval (10 second restart delay, >12 failures), that lets the TM
> come back (takes about 50 seconds).
>
> *Flink Bug*
> But I cannot help but think why there is no interaction between the
> ResourceManager and JobManager, i.e. why is the jobmanager continuing with
> the processing despite not having the required TMs ?
>
> Logs to substantiate what I said above (only relevant).
>
> 018-09-03 06:17:13,932 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Container
> container_e28_1535135887442_1381_01_000097 completed successfully with
> diagnostics: Container released by application
>
>
> 2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor
>                   - Association with remote system [akka.tcp://
> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
> 2018-09-03 06:34:19,214 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Container
> container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
> 2018-09-03 06:34:19,215 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics
> for container container_e27_1535135887442_1381_01_000102 in state COMPLETE
> : exitStatus=-102 diagnostics=Container preempted by scheduler
> 2018-09-03 06:34:19,215 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Total
> number of failed containers so far: 1
> 2018-09-03 06:34:19,215 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting
> new TaskManager container with 20000 megabytes memory. Pending requests: 1
> 2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Task manager akka.tcp://
> flink@hello-world9-27-crz.ops.sfdc.net:46219/user/taskmanager terminated.
> 2018-09-03 06:34:19,218 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
> state RUNNING to FAILING.
> java.lang.Exception: TaskManager was lost/killed:
> container_e27_1535135887442_1381_01_000102 @
> hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
> 2018-09-03 06:34:19,466 INFO
> org.apache.flink.runtime.instance.InstanceManager             -
> Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220.
> Number of registered task managers 144. Number of available slots 720
>
> 2018-09-03 06:34:24,717 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
> new container: container_e28_1535135887442_1381_01_000147 - Remaining
> pending container requests: 0
> 2018-09-03 06:34:24,717 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
> TaskManager in container ContainerInLaunch @ 1535956464717: Container:
> [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId:
> hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress:
> hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480,
> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
> 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
> 2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor
>                   - Association with remote system [akka.tcp://
> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
> gated for [5000] ms. Reason: [Association failed with [akka.tcp://
> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
> 2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport
>                   - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused:
> hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>                   - Association with remote system [akka.tcp://
> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
> gated for [5000] ms. Reason: [Association failed with [akka.tcp://
> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
> 2018-09-03 06:34:34,540 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
> state RESTARTING to CREATED.
>
> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>                   - Association with remote system [akka.tcp://
> flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
> gated for [5000] ms. Reason: [Association failed with [akka.tcp://
> flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
> 2018-09-03 06:34:34,540 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
> state RESTARTING to CREATED.
> 2018-09-03 06:34:35,044 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
> state CREATED to RUNNING.
> 2018-09-03 06:34:35,195 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
> state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. Task to schedule: < Attempt #3 (Source: Custom Source ->
> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'),
> =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch),
> _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'),
> cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS
> NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF
>
>
>
>
> 2018-09-03 06:34:39,248 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager
> container_e28_1535135887442_1381_01_000147 has started.
> 2018-09-03 06:34:45,235 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Container
> container_e28_1535135887442_1381_01_000147 failed. Exit status: -102
> 2018-09-03 06:34:45,235 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics
> for container container_e28_1535135887442_1381_01_000147 in state COMPLETE
> : exitStatus=-102 diagnostics=Container preempted by scheduler
> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Task manager akka.tcp://
> flink@hello-world9-27-crz.ops.sfdc.net:41966/user/taskmanager terminated.
> 2018-09-03 06:34:45,236 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Total
> number of failed containers so far: 2
> 2018-09-03 06:34:45,236 INFO
> org.apache.flink.runtime.instance.InstanceManager             -
> Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220.
> Number of registered task managers 144. Number of available slots 720.
>
> 2018-09-03 06:34:45,236 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
> new container: container_e28_1535135887442_1381_01_000202 - Remaining
> pending container requests: 0
> 2018-09-03 06:34:45,236 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
> TaskManager in container ContainerInLaunch @ 1535956485236: Container:
> [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId:
> hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress:
> hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480,
> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
> 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
> 2018-09-03 06:34:45,241 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
> new container: container_e28_1535135887442_1381_01_000203 - Remaining
> pending container requests: 0
> 2018-09-03 06:34:45,241 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Returning
> excess container container_e28_1535135887442_1381_01_000203
>
> Notice there is no TaskManager container_e28_1535135887442_1381_01_000202
> has started.
> I see
> 2018-09-03 06:34:56,894 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
> state FAILING to FAILED.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. Task to schedule: < Attempt #5 (Source: Custom Source ->
> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R
>
>
> 2018-09-03 06:34:57,005 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting
> down cluster with status FAILED : The monitored job with ID
> 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
> 2018-09-03 06:34:57,007 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                -
> Unregistering application from the YARN Resource Manager
>
>
>
>
> On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <ssuresh@salesforce.com
> > wrote:
>
>> Thanks TIll,
>> I do not see any Akka related messages in that Taskmanager after the
>> initial startup. It seemed like all is well. So after the remotewatcher
>> detects it unreachable and the TaskManager unregisters it, I do not see any
>> other activity in the JobManager with regards to reallocation etc.
>> - Does the quarantining of the TaskManager not happen until  the
>> exit-on-fatal-akka-error is turned on ?
>> - Does the JobManager or the TaskManager try to reconnect to each other
>> again ? Is there a different setting for it ?
>> - Does the JobManager not reallocate a TaskManager despite it being
>> unregistered, until the TaskManager exits ? I think it should, especially
>> if it is not trying to establish a connection again.
>>
>> I will give the flag a try.
>>
>> Sincerely,
>>
>>
>> On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Could you check whether akka.tcp://
>>> flink@blahabc.sfdc.net:123/user/taskmanager is still running? E.g.
>>> tries to reconnect to the JobManager? If this is the case, then the
>>> container is still running and the YarnFlinkResourceManager thinks that
>>> everything is alright. You can activate that a TaskManager kills itself if
>>> it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true`
>>> in the `flink-conf.yaml`.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <
>>> ssuresh@salesforce.com> wrote:
>>>
>>>> Hi Till,
>>>> Greatly appreciate your reply.
>>>> We use version 1.4.2. I do not see nothing unusual in the logs for TM
>>>> that was lost. Note: I have looked at many such failures and see the same
>>>> below pattern.
>>>>
>>>> The JM logs above had most of what I had, but the below is what I have
>>>> when I search for flink.yarn (we have huge logs otherwise, given the amount
>>>> of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost
>>>> and unregistered by JM, operators start failing with
>>>> NoResourceAvailableException since there was one less TM, 5 retry attempts
>>>> later job goes down.
>>>>
>>>> ………….
>>>>
>>>> 2018-08-29 23:02:41,216 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>> TaskManager container_e27_1535135887442_0906_01_000124 has started.
>>>>
>>>> 2018-08-29 23:02:50,095 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>> TaskManager container_e27_1535135887442_0906_01_000159 has started.
>>>>
>>>> 2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Submitting job
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).
>>>>
>>>> 2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Using restart strategy
>>>> FixedDelayRestartStrategy(maxNumberRestartAttempts=5,
>>>> delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>
>>>> 2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Running initialization on master for job
>>>> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).
>>>>
>>>> 2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Successfully ran initialization on master in 0
>>>> ms.
>>>>
>>>> 2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Using application-defined state backend for
>>>> checkpoint/savepoint metadata: File State Backend @
>>>> hdfs://security-temp/savedSearches/checkpoint.
>>>>
>>>> 2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Scheduling job
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).
>>>>
>>>> 2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Attempting to recover all jobs.
>>>>
>>>> 2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - There are 1 jobs to recover. Starting the job
>>>> recovery.
>>>>
>>>> 2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Attempting to recover job
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>
>>>> 2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Ignoring job recovery for
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.
>>>>
>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager
>>>> terminated.
>>>>
>>>> at
>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>
>>>> at
>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>
>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Received message for non-existing checkpoint 1
>>>>
>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>> is in terminal state FAILED. Shutting down session
>>>>
>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Stopping JobManager with final application
>>>> status FAILED and diagnostics: The monitored job with ID
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>
>>>> 2018-08-29 23:15:26,129 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>> Shutting down cluster with status FAILED : The monitored job with ID
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>
>>>> 2018-08-29 23:15:26,146 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>> Unregistering application from the YARN Resource Manager
>>>>
>>>> 2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Deleting yarn application files under
>>>> hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.
>>>>
>>>> 2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Stopping JobManager akka.tcp://flink@
>>>> blahxyz.sfdc.net <http://blahabc.sfdc.net/>:1235/user/jobmanager.
>>>>
>>>> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager
>>>>                       - Actor system shut down timed out.
>>>>
>>>> 2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Shutdown completed. Stopping JVM.
>>>>
>>>>
>>>> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Subramanya,
>>>>>
>>>>> in order to help you I need a little bit of context. Which version of
>>>>> Flink are you running? The configuration yarn.reallocate-failed is
>>>>> deprecated since version Flink 1.1 and does not have an effect anymore.
>>>>>
>>>>> What would be helpful is to get the full jobmanager log from you. If
>>>>> the YarnFlinkResourceManager gets notified that a container has failed, it
>>>>> should restart this container (it will do this 145 times). So if the
>>>>> YarnFlinkResourceManager does not get notified about a completed container,
>>>>> then this might indicate that the container is still running. So it would
>>>>> be good to check what the logs of container_e27_1535135887442_0906_01_000039
>>>>> say.
>>>>>
>>>>> Moreover, do you see the same problem occur when using the latest
>>>>> release Flink 1.6.0?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <
>>>>> ssuresh@salesforce.com> wrote:
>>>>>
>>>>>> Hi, we are seeing a weird issue where one TaskManager is lost and
>>>>>> then never re-allocated and subsequently operators fail with
>>>>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay
>>>>>> restarts of 5) the application goes down.
>>>>>>
>>>>>>    - We have explicitly set *yarn.reallocate-failed: *true and have
>>>>>>    not specified the yarn.maximum-failed-containers (and see
>>>>>>    “org.apache.flink.yarn.YarnApplicationMasterRunner             -
>>>>>>    YARN application tolerates 145 failed TaskManager containers before giving
>>>>>>    up” in the logs).
>>>>>>    - After the initial startup where all 145 TaskManagers are
>>>>>>    requested I never see any logs saying “Requesting new TaskManager
>>>>>>    container” to reallocate failed container.
>>>>>>
>>>>>>
>>>>>> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>>>>>>                         - Detected unreachable: [akka.tcp://
>>>>>> flink@blahabc.sfdc.net:123]
>>>>>>
>>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Task manager akka.tcp://
>>>>>> flink@blahabc.sfdc.net:123/user/taskmanager terminated.
>>>>>>
>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
>>>>>> (dataPort=124)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>>>>
>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>>>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>
>>>>>> at
>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>>>>>>
>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>
>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>>
>>>>>> at
>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>>>>
>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>>>>
>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>>>>>
>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>
>>>>>> at
>>>>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>>>>>
>>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>>
>>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>>
>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>>
>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>
>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>
>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>
>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414
>>>>>> (dataPort=124)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>>>>
>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>>>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>
>>>>>> at
>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>>>>>>
>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>
>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>>
>>>>>> at
>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>>>>
>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>>>>
>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>>>>>
>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>
>>>>>> at
>>>>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>>>>>
>>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>>
>>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>>
>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>>
>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>
>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>
>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>
>>>>>> 2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager
>>>>>>             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1.
>>>>>> Number of registered task managers 144. Number of available slots 720.
>>>>>>
>>>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Received message for non-existing
>>>>>> checkpoint 1
>>>>>>
>>>>>> 2018-08-29 23:14:39,969 INFO
>>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>>>>>> - Recovering checkpoints from ZooKeeper.
>>>>>>
>>>>>> 2018-08-29 23:14:39,975 INFO
>>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>>>>>> - Found 0 checkpoints in ZooKeeper.
>>>>>>
>>>>>> 2018-08-29 23:14:39,975 INFO
>>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>>>>>> - Trying to fetch 0 checkpoints from storage.
>>>>>>
>>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>>> Not enough free slots available to run the job. You can decrease the
>>>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>>>> configuration. Task to schedule: < Attempt #1
>>>>>>
>>>>>>
>>>>>> *After 5 retries of our Sql query execution graph (we have configured
>>>>>> 5 fixed delay restart), it outputs the below, *
>>>>>>
>>>>>>
>>>>>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>     - Stopping checkpoint coordinator for job
>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>>>
>>>>>> 2018-08-29 23:15:22,216 INFO
>>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>>>>>> - Shutting down
>>>>>>
>>>>>> 2018-08-29 23:15:22,225 INFO
>>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>>>>>> - Removing
>>>>>> /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>>> from ZooKeeper
>>>>>>
>>>>>> 2018-08-29 23:15:23,460 INFO
>>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>>>>>> Shutting down.
>>>>>>
>>>>>> 2018-08-29 23:15:23,460 INFO
>>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>>>>>> Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper
>>>>>>
>>>>>> 2018-08-29 23:15:24,114 INFO
>>>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>>>>> Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.
>>>>>>
>>>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Job with ID
>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down
>>>>>> session
>>>>>>
>>>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>                         - Stopping JobManager with final application
>>>>>> status FAILED and diagnostics: The monitored job with ID
>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>>
>>>>>> 2018-08-29 23:15:26,129 INFO
>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager
>>>>>>
>>>>>

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Posted by Subramanya Suresh <ss...@salesforce.com>.
Hi,
With some additional research,

*Before the flag*
I realized for failed containers (that exited for a specific  we still were
Requesting new TM container and launching TM). But for the "Detected
unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]" issue I do not see
the container marked as failed and a subsequent request for TM.

*After taskmanager.exit-on-fatal-akka-error: true` *
I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123] since
we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if
that is a coincidence or a direct impact of this change.

*Our Issue:*
I realized we are still exiting the application, i.e. failing when the
containers are lost. The reason for this is before
org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new
container TM, launch TM and it is reported as started, the
org.apache.flink.runtime.jobmanager.scheduler
throws a NoResourceAvailableException that causes a failure. In our case we
had fixed restart strategy with 5, and we are running out of it because of
this. I am looking to solve this with a FailureRateRestartStrategy over 2
minutes interval (10 second restart delay, >12 failures), that lets the TM
come back (takes about 50 seconds).

*Flink Bug*
But I cannot help but think why there is no interaction between the
ResourceManager and JobManager, i.e. why is the jobmanager continuing with
the processing despite not having the required TMs ?

Logs to substantiate what I said above (only relevant).

018-09-03 06:17:13,932 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Container
container_e28_1535135887442_1381_01_000097 completed successfully with
diagnostics: Container released by application


2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor
                - Association with remote system [akka.tcp://
flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
gated for [5000] ms. Reason: [Disassociated]
2018-09-03 06:34:19,214 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Container
container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
2018-09-03 06:34:19,215 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics
for container container_e27_1535135887442_1381_01_000102 in state COMPLETE
: exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:19,215 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Total
number of failed containers so far: 1
2018-09-03 06:34:19,215 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting
new TaskManager container with 20000 megabytes memory. Pending requests: 1
2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager
                - Task manager akka.tcp://
flink@hello-world9-27-crz.ops.sfdc.net:46219/user/taskmanager terminated.
2018-09-03 06:34:19,218 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
container_e27_1535135887442_1381_01_000102 @
hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
2018-09-03 06:34:19,466 INFO
org.apache.flink.runtime.instance.InstanceManager             -
Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220.
Number of registered task managers 144. Number of available slots 720

2018-09-03 06:34:24,717 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Received
new container: container_e28_1535135887442_1381_01_000147 - Remaining
pending container requests: 0
2018-09-03 06:34:24,717 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
TaskManager in container ContainerInLaunch @ 1535956464717: Container:
[ContainerId: container_e28_1535135887442_1381_01_000147, NodeId:
hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress:
hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>,
Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.35.220:8041
}, ] on host hello-world9-27-crz.ops.sfdc.net
2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor
                - Association with remote system [akka.tcp://
flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
gated for [5000] ms. Reason: [Association failed with [akka.tcp://
flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport
                - Remote connection to [null] failed with
java.net.ConnectException: Connection refused:
hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
                - Association with remote system [akka.tcp://
flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
gated for [5000] ms. Reason: [Association failed with [akka.tcp://
flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
state RESTARTING to CREATED.

2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
                - Association with remote system [akka.tcp://
flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now
gated for [5000] ms. Reason: [Association failed with [akka.tcp://
flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
state RESTARTING to CREATED.
2018-09-03 06:34:35,044 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
state CREATED to RUNNING.
2018-09-03 06:34:35,195 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. Task to schedule: < Attempt #3 (Source: Custom Source ->
(Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
=(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
_UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'),
=(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch),
_UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'),
cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS
NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF




2018-09-03 06:34:39,248 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager
container_e28_1535135887442_1381_01_000147 has started.
2018-09-03 06:34:45,235 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Container
container_e28_1535135887442_1381_01_000147 failed. Exit status: -102
2018-09-03 06:34:45,235 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics
for container container_e28_1535135887442_1381_01_000147 in state COMPLETE
: exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager
                - Task manager akka.tcp://
flink@hello-world9-27-crz.ops.sfdc.net:41966/user/taskmanager terminated.
2018-09-03 06:34:45,236 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Total
number of failed containers so far: 2
2018-09-03 06:34:45,236 INFO
org.apache.flink.runtime.instance.InstanceManager             -
Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220.
Number of registered task managers 144. Number of available slots 720.

2018-09-03 06:34:45,236 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Received
new container: container_e28_1535135887442_1381_01_000202 - Remaining
pending container requests: 0
2018-09-03 06:34:45,236 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
TaskManager in container ContainerInLaunch @ 1535956485236: Container:
[ContainerId: container_e28_1535135887442_1381_01_000202, NodeId:
hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress:
hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>,
Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.34.160:8041
}, ] on host hello-world4-31-crz.ops.sfdc.net
2018-09-03 06:34:45,241 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Received
new container: container_e28_1535135887442_1381_01_000203 - Remaining
pending container requests: 0
2018-09-03 06:34:45,241 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Returning
excess container container_e28_1535135887442_1381_01_000203

Notice there is no TaskManager container_e28_1535135887442_1381_01_000202
has started.
I see
2018-09-03 06:34:56,894 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
state FAILING to FAILED.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. Task to schedule: < Attempt #5 (Source: Custom Source ->
(Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
=(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
_UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R


2018-09-03 06:34:57,005 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting
down cluster with status FAILED : The monitored job with ID
96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
2018-09-03 06:34:57,007 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                -
Unregistering application from the YARN Resource Manager




On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <ss...@salesforce.com>
wrote:

> Thanks TIll,
> I do not see any Akka related messages in that Taskmanager after the
> initial startup. It seemed like all is well. So after the remotewatcher
> detects it unreachable and the TaskManager unregisters it, I do not see any
> other activity in the JobManager with regards to reallocation etc.
> - Does the quarantining of the TaskManager not happen until  the
> exit-on-fatal-akka-error is turned on ?
> - Does the JobManager or the TaskManager try to reconnect to each other
> again ? Is there a different setting for it ?
> - Does the JobManager not reallocate a TaskManager despite it being
> unregistered, until the TaskManager exits ? I think it should, especially
> if it is not trying to establish a connection again.
>
> I will give the flag a try.
>
> Sincerely,
>
>
> On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Could you check whether akka.tcp://flink@blaha
>> bc.sfdc.net:123/user/taskmanager is still running? E.g. tries to
>> reconnect to the JobManager? If this is the case, then the container is
>> still running and the YarnFlinkResourceManager thinks that everything is
>> alright. You can activate that a TaskManager kills itself if it gets
>> quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` in
>> the `flink-conf.yaml`.
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <
>> ssuresh@salesforce.com> wrote:
>>
>>> Hi Till,
>>> Greatly appreciate your reply.
>>> We use version 1.4.2. I do not see nothing unusual in the logs for TM
>>> that was lost. Note: I have looked at many such failures and see the same
>>> below pattern.
>>>
>>> The JM logs above had most of what I had, but the below is what I have
>>> when I search for flink.yarn (we have huge logs otherwise, given the amount
>>> of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost
>>> and unregistered by JM, operators start failing with
>>> NoResourceAvailableException since there was one less TM, 5 retry attempts
>>> later job goes down.
>>>
>>> ………….
>>>
>>> 2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlin
>>> kResourceManager                - TaskManager
>>> container_e27_1535135887442_0906_01_000124 has started.
>>>
>>> 2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlin
>>> kResourceManager                - TaskManager
>>> container_e27_1535135887442_0906_01_000159 has started.
>>>
>>> 2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335
>>> (streaming-searches-prod).
>>>
>>> 2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Using restart strategy
>>> FixedDelayRestartStrategy(maxNumberRestartAttempts=5,
>>> delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>
>>> 2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Running initialization on master for job
>>> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).
>>>
>>> 2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Successfully ran initialization on master in 0 ms.
>>>
>>> 2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Using application-defined state backend for
>>> checkpoint/savepoint metadata: File State Backend @
>>> hdfs://security-temp/savedSearches/checkpoint.
>>>
>>> 2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335
>>> (streaming-searches-prod).
>>>
>>> 2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Attempting to recover all jobs.
>>>
>>> 2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - There are 1 jobs to recover. Starting the job
>>> recovery.
>>>
>>> 2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Attempting to recover job
>>> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>
>>> 2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Ignoring job recovery for
>>> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.
>>>
>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager
>>> terminated.
>>>
>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShut
>>> down$1.applyOrElse(YarnJobManager.scala:110)
>>>
>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShut
>>> down$1.applyOrElse(YarnJobManager.scala:110)
>>>
>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Received message for non-existing checkpoint 1
>>>
>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is
>>> in terminal state FAILED. Shutting down session
>>>
>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Stopping JobManager with final application status
>>> FAILED and diagnostics: The monitored job with ID
>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>
>>> 2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlin
>>> kResourceManager                - Shutting down cluster with status
>>> FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has
>>> failed to complete.
>>>
>>> 2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlin
>>> kResourceManager                - Unregistering application from the
>>> YARN Resource Manager
>>>
>>> 2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Deleting yarn application files under
>>> hdfs://security-temp/user/sec-data-app/.flink/application_15
>>> 35135887442_0906.
>>>
>>> 2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.
>>> net <http://blahabc.sfdc.net/>:1235/user/jobmanager.
>>>
>>> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager
>>>                     - Actor system shut down timed out.
>>>
>>> 2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Shutdown completed. Stopping JVM.
>>>
>>>
>>> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Subramanya,
>>>>
>>>> in order to help you I need a little bit of context. Which version of
>>>> Flink are you running? The configuration yarn.reallocate-failed is
>>>> deprecated since version Flink 1.1 and does not have an effect anymore.
>>>>
>>>> What would be helpful is to get the full jobmanager log from you. If
>>>> the YarnFlinkResourceManager gets notified that a container has failed, it
>>>> should restart this container (it will do this 145 times). So if the
>>>> YarnFlinkResourceManager does not get notified about a completed container,
>>>> then this might indicate that the container is still running. So it would
>>>> be good to check what the logs of container_e27_1535135887442_0906_01_000039
>>>> say.
>>>>
>>>> Moreover, do you see the same problem occur when using the latest
>>>> release Flink 1.6.0?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <
>>>> ssuresh@salesforce.com> wrote:
>>>>
>>>>> Hi, we are seeing a weird issue where one TaskManager is lost and then
>>>>> never re-allocated and subsequently operators fail with
>>>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay
>>>>> restarts of 5) the application goes down.
>>>>>
>>>>>    - We have explicitly set *yarn.reallocate-failed: *true and have
>>>>>    not specified the yarn.maximum-failed-containers (and see
>>>>>    “org.apache.flink.yarn.YarnApplicationMasterRunner             -
>>>>>    YARN application tolerates 145 failed TaskManager containers before giving
>>>>>    up” in the logs).
>>>>>    - After the initial startup where all 145 TaskManagers are
>>>>>    requested I never see any logs saying “Requesting new TaskManager
>>>>>    container” to reallocate failed container.
>>>>>
>>>>>
>>>>> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>>>>>                       - Detected unreachable: [akka.tcp://
>>>>> flink@blahabc.sfdc.net:123]
>>>>>
>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Task manager akka.tcp://flink@blahabc.sfdc.
>>>>> net:123/user/taskmanager terminated.
>>>>>
>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
>>>>> (dataPort=124)
>>>>>
>>>>> at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(
>>>>> SimpleSlot.java:217)
>>>>>
>>>>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment
>>>>> .releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>
>>>>> at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(
>>>>> SharedSlot.java:192)
>>>>>
>>>>> at org.apache.flink.runtime.instance.Instance.markDead(Instance
>>>>> .java:167)
>>>>>
>>>>> at org.apache.flink.runtime.instance.InstanceManager.unregister
>>>>> TaskManager(InstanceManager.java:212)
>>>>>
>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$
>>>>> flink$runtime$jobmanager$JobManager$$handleTaskManagerT
>>>>> erminated(JobManager.scala:1198)
>>>>>
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>>>>> handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>
>>>>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>>>>> unction.scala:36)
>>>>>
>>>>> at org.apache.flink.runtime.clusterframework.ContaineredJobMana
>>>>> ger$$anonfun$handleContainerMessage$1.applyOrElse(Containere
>>>>> dJobManager.scala:107)
>>>>>
>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>
>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShut
>>>>> down$1.applyOrElse(YarnJobManager.scala:110)
>>>>>
>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>
>>>>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun
>>>>> $receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>
>>>>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>>>>> unction.scala:36)
>>>>>
>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>>>>> es.scala:33)
>>>>>
>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>>>>> es.scala:28)
>>>>>
>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>
>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(
>>>>> LogMessages.scala:28)
>>>>>
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>
>>>>> at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive
>>>>> (JobManager.scala:122)
>>>>>
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>
>>>>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(Death
>>>>> Watch.scala:46)
>>>>>
>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>
>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>>>>> java:260)
>>>>>
>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>> ForkJoinPool.java:1339)
>>>>>
>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>>>> l.java:1979)
>>>>>
>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>>>> orkerThread.java:107)
>>>>>
>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414
>>>>> (dataPort=124)
>>>>>
>>>>> at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(
>>>>> SimpleSlot.java:217)
>>>>>
>>>>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment
>>>>> .releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>
>>>>> at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(
>>>>> SharedSlot.java:192)
>>>>>
>>>>> at org.apache.flink.runtime.instance.Instance.markDead(Instance
>>>>> .java:167)
>>>>>
>>>>> at org.apache.flink.runtime.instance.InstanceManager.unregister
>>>>> TaskManager(InstanceManager.java:212)
>>>>>
>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$
>>>>> flink$runtime$jobmanager$JobManager$$handleTaskManagerT
>>>>> erminated(JobManager.scala:1198)
>>>>>
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>>>>> handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>
>>>>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>>>>> unction.scala:36)
>>>>>
>>>>> at org.apache.flink.runtime.clusterframework.ContaineredJobMana
>>>>> ger$$anonfun$handleContainerMessage$1.applyOrElse(Containere
>>>>> dJobManager.scala:107)
>>>>>
>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>
>>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShut
>>>>> down$1.applyOrElse(YarnJobManager.scala:110)
>>>>>
>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>
>>>>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun
>>>>> $receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>
>>>>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>>>>> unction.scala:36)
>>>>>
>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>>>>> es.scala:33)
>>>>>
>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>>>>> es.scala:28)
>>>>>
>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>
>>>>> at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(
>>>>> LogMessages.scala:28)
>>>>>
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>
>>>>> at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive
>>>>> (JobManager.scala:122)
>>>>>
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>
>>>>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(Death
>>>>> Watch.scala:46)
>>>>>
>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>
>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>>>>> java:260)
>>>>>
>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>> ForkJoinPool.java:1339)
>>>>>
>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>>>> l.java:1979)
>>>>>
>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>>>> orkerThread.java:107)
>>>>>
>>>>> 2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager
>>>>>             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1.
>>>>> Number of registered task managers 144. Number of available slots 720.
>>>>>
>>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Received message for non-existing checkpoint 1
>>>>>
>>>>> 2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.check
>>>>> point.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints
>>>>> from ZooKeeper.
>>>>>
>>>>> 2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.check
>>>>> point.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in
>>>>> ZooKeeper.
>>>>>
>>>>> 2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.check
>>>>> point.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0
>>>>> checkpoints from storage.
>>>>>
>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>> Not enough free slots available to run the job. You can decrease the
>>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>>> configuration. Task to schedule: < Attempt #1
>>>>>
>>>>>
>>>>> *After 5 retries of our Sql query execution graph (we have configured
>>>>> 5 fixed delay restart), it outputs the below, *
>>>>>
>>>>>
>>>>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>     - Stopping checkpoint coordinator for job
>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>>
>>>>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.check
>>>>> point.ZooKeeperCompletedCheckpointStore  - Shutting down
>>>>>
>>>>> 2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.check
>>>>> point.ZooKeeperCompletedCheckpointStore  - Removing
>>>>> /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>> from ZooKeeper
>>>>>
>>>>> 2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.check
>>>>> point.ZooKeeperCheckpointIDCounter  - Shutting down.
>>>>>
>>>>> 2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.check
>>>>> point.ZooKeeperCheckpointIDCounter  - Removing
>>>>> /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper
>>>>>
>>>>> 2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobma
>>>>> nager.ZooKeeperSubmittedJobGraphStore  - Removed job graph
>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.
>>>>>
>>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>> is in terminal state FAILED. Shutting down session
>>>>>
>>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>                       - Stopping JobManager with final application
>>>>> status FAILED and diagnostics: The monitored job with ID
>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>
>>>>> 2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlin
>>>>> kResourceManager                - Shutting down cluster with status
>>>>> FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>> has failed to complete.
>>>>>
>>>>> 2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlin
>>>>> kResourceManager                - Unregistering application from the
>>>>> YARN Resource Manager
>>>>>
>>>>> 2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist
>>>>>               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been
>>>>> archived at hdfs:/savedSearches/prod/compl
>>>>> eted-jobs/ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>>
>>>>>
>>>>> Cheers,
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> <http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>
>>>
>>
>
>
> --
>
> <http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>
>



-- 

<http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Posted by Subramanya Suresh <ss...@salesforce.com>.
Thanks TIll,
I do not see any Akka related messages in that Taskmanager after the
initial startup. It seemed like all is well. So after the remotewatcher
detects it unreachable and the TaskManager unregisters it, I do not see any
other activity in the JobManager with regards to reallocation etc.
- Does the quarantining of the TaskManager not happen until  the
exit-on-fatal-akka-error is turned on ?
- Does the JobManager or the TaskManager try to reconnect to each other
again ? Is there a different setting for it ?
- Does the JobManager not reallocate a TaskManager despite it being
unregistered, until the TaskManager exits ? I think it should, especially
if it is not trying to establish a connection again.

I will give the flag a try.

Sincerely,


On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <tr...@apache.org> wrote:

> Could you check whether akka.tcp://flink@blahabc.sfdc.net:123/user/
> taskmanager is still running? E.g. tries to reconnect to the JobManager?
> If this is the case, then the container is still running and the
> YarnFlinkResourceManager thinks that everything is alright. You can
> activate that a TaskManager kills itself if it gets quarantined by setting
> `taskmanager.exit-on-fatal-akka-error: true` in the `flink-conf.yaml`.
>
> Cheers,
> Till
>
> On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <ss...@salesforce.com>
> wrote:
>
>> Hi Till,
>> Greatly appreciate your reply.
>> We use version 1.4.2. I do not see nothing unusual in the logs for TM
>> that was lost. Note: I have looked at many such failures and see the same
>> below pattern.
>>
>> The JM logs above had most of what I had, but the below is what I have
>> when I search for flink.yarn (we have huge logs otherwise, given the amount
>> of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost
>> and unregistered by JM, operators start failing with
>> NoResourceAvailableException since there was one less TM, 5 retry attempts
>> later job goes down.
>>
>> ………….
>>
>> 2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.
>> YarnFlinkResourceManager                - TaskManager
>> container_e27_1535135887442_0906_01_000124 has started.
>>
>> 2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.
>> YarnFlinkResourceManager                - TaskManager
>> container_e27_1535135887442_0906_01_000159 has started.
>>
>> 2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335
>> (streaming-searches-prod).
>>
>> 2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5,
>> delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>
>> 2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Running initialization on master for job
>> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).
>>
>> 2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Successfully ran initialization on master in 0 ms.
>>
>> 2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Using application-defined state backend for
>> checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/
>> savedSearches/checkpoint.
>>
>> 2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335
>> (streaming-searches-prod).
>>
>> 2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Attempting to recover all jobs.
>>
>> 2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - There are 1 jobs to recover. Starting the job
>> recovery.
>>
>> 2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Attempting to recover job
>> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>
>> 2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Ignoring job recovery for
>> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.
>>
>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager
>> terminated.
>>
>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>> applyOrElse(YarnJobManager.scala:110)
>>
>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>> applyOrElse(YarnJobManager.scala:110)
>>
>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Received message for non-existing checkpoint 1
>>
>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in
>> terminal state FAILED. Shutting down session
>>
>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Stopping JobManager with final application status
>> FAILED and diagnostics: The monitored job with ID
>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>
>> 2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.
>> YarnFlinkResourceManager                - Shutting down cluster with
>> status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>> has failed to complete.
>>
>> 2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.
>> YarnFlinkResourceManager                - Unregistering application from
>> the YARN Resource Manager
>>
>> 2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Deleting yarn application files under
>> hdfs://security-temp/user/sec-data-app/.flink/application_
>> 1535135887442_0906.
>>
>> 2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.
>> net <http://blahabc.sfdc.net/>:1235/user/jobmanager.
>>
>> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager
>>                     - Actor system shut down timed out.
>>
>> 2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Shutdown completed. Stopping JVM.
>>
>>
>> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Subramanya,
>>>
>>> in order to help you I need a little bit of context. Which version of
>>> Flink are you running? The configuration yarn.reallocate-failed is
>>> deprecated since version Flink 1.1 and does not have an effect anymore.
>>>
>>> What would be helpful is to get the full jobmanager log from you. If the
>>> YarnFlinkResourceManager gets notified that a container has failed, it
>>> should restart this container (it will do this 145 times). So if the
>>> YarnFlinkResourceManager does not get notified about a completed container,
>>> then this might indicate that the container is still running. So it would
>>> be good to check what the logs of container_e27_1535135887442_0906_01_000039
>>> say.
>>>
>>> Moreover, do you see the same problem occur when using the latest
>>> release Flink 1.6.0?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <
>>> ssuresh@salesforce.com> wrote:
>>>
>>>> Hi, we are seeing a weird issue where one TaskManager is lost and then
>>>> never re-allocated and subsequently operators fail with
>>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay
>>>> restarts of 5) the application goes down.
>>>>
>>>>    - We have explicitly set *yarn.reallocate-failed: *true and have
>>>>    not specified the yarn.maximum-failed-containers (and see
>>>>    “org.apache.flink.yarn.YarnApplicationMasterRunner             -
>>>>    YARN application tolerates 145 failed TaskManager containers before giving
>>>>    up” in the logs).
>>>>    - After the initial startup where all 145 TaskManagers are
>>>>    requested I never see any logs saying “Requesting new TaskManager
>>>>    container” to reallocate failed container.
>>>>
>>>>
>>>> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>>>>                       - Detected unreachable: [akka.tcp://
>>>> flink@blahabc.sfdc.net:123]
>>>>
>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Task manager akka.tcp://flink@blahabc.sfdc.
>>>> net:123/user/taskmanager terminated.
>>>>
>>>> java.lang.Exception: TaskManager was lost/killed:
>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
>>>> (dataPort=124)
>>>>
>>>> at org.apache.flink.runtime.instance.SimpleSlot.
>>>> releaseSlot(SimpleSlot.java:217)
>>>>
>>>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.
>>>> releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>
>>>> at org.apache.flink.runtime.instance.SharedSlot.
>>>> releaseSlot(SharedSlot.java:192)
>>>>
>>>> at org.apache.flink.runtime.instance.Instance.markDead(
>>>> Instance.java:167)
>>>>
>>>> at org.apache.flink.runtime.instance.InstanceManager.
>>>> unregisterTaskManager(InstanceManager.java:212)
>>>>
>>>> at org.apache.flink.runtime.jobmanager.JobManager.org$
>>>> apache$flink$runtime$jobmanager$JobManager$$
>>>> handleTaskManagerTerminated(JobManager.scala:1198)
>>>>
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>> anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>
>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>> AbstractPartialFunction.scala:36)
>>>>
>>>> at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$
>>>> anonfun$handleContainerMessage$1.applyOrElse(
>>>> ContaineredJobManager.scala:107)
>>>>
>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>
>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>>>> applyOrElse(YarnJobManager.scala:110)
>>>>
>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>
>>>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
>>>> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>
>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>> AbstractPartialFunction.scala:36)
>>>>
>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>> LogMessages.scala:33)
>>>>
>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>> LogMessages.scala:28)
>>>>
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>
>>>> at org.apache.flink.runtime.LogMessages$$anon$1.
>>>> applyOrElse(LogMessages.scala:28)
>>>>
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>
>>>> at org.apache.flink.runtime.jobmanager.JobManager.
>>>> aroundReceive(JobManager.scala:122)
>>>>
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>
>>>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(
>>>> DeathWatch.scala:46)
>>>>
>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>
>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>
>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>> runTask(ForkJoinPool.java:1339)
>>>>
>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>> ForkJoinPool.java:1979)
>>>>
>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>> ForkJoinWorkerThread.java:107)
>>>>
>>>> java.lang.Exception: TaskManager was lost/killed:
>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414
>>>> (dataPort=124)
>>>>
>>>> at org.apache.flink.runtime.instance.SimpleSlot.
>>>> releaseSlot(SimpleSlot.java:217)
>>>>
>>>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.
>>>> releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>
>>>> at org.apache.flink.runtime.instance.SharedSlot.
>>>> releaseSlot(SharedSlot.java:192)
>>>>
>>>> at org.apache.flink.runtime.instance.Instance.markDead(
>>>> Instance.java:167)
>>>>
>>>> at org.apache.flink.runtime.instance.InstanceManager.
>>>> unregisterTaskManager(InstanceManager.java:212)
>>>>
>>>> at org.apache.flink.runtime.jobmanager.JobManager.org$
>>>> apache$flink$runtime$jobmanager$JobManager$$
>>>> handleTaskManagerTerminated(JobManager.scala:1198)
>>>>
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>> anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>
>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>> AbstractPartialFunction.scala:36)
>>>>
>>>> at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$
>>>> anonfun$handleContainerMessage$1.applyOrElse(
>>>> ContaineredJobManager.scala:107)
>>>>
>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>
>>>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>>>> applyOrElse(YarnJobManager.scala:110)
>>>>
>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>
>>>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
>>>> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>
>>>> at scala.runtime.AbstractPartialFunction.apply(
>>>> AbstractPartialFunction.scala:36)
>>>>
>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>> LogMessages.scala:33)
>>>>
>>>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>>>> LogMessages.scala:28)
>>>>
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>
>>>> at org.apache.flink.runtime.LogMessages$$anon$1.
>>>> applyOrElse(LogMessages.scala:28)
>>>>
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>
>>>> at org.apache.flink.runtime.jobmanager.JobManager.
>>>> aroundReceive(JobManager.scala:122)
>>>>
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>
>>>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(
>>>> DeathWatch.scala:46)
>>>>
>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>
>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>
>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>> runTask(ForkJoinPool.java:1339)
>>>>
>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>> ForkJoinPool.java:1979)
>>>>
>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>> ForkJoinWorkerThread.java:107)
>>>>
>>>> 2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager
>>>>             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1.
>>>> Number of registered task managers 144. Number of available slots 720.
>>>>
>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Received message for non-existing checkpoint 1
>>>>
>>>> 2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.
>>>> ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from
>>>> ZooKeeper.
>>>>
>>>> 2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.
>>>> ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
>>>>
>>>> 2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.
>>>> ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints
>>>> from storage.
>>>>
>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>> Not enough free slots available to run the job. You can decrease the
>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>> configuration. Task to schedule: < Attempt #1
>>>>
>>>>
>>>> *After 5 retries of our Sql query execution graph (we have configured 5
>>>> fixed delay restart), it outputs the below, *
>>>>
>>>>
>>>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>     - Stopping checkpoint coordinator for job
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>>
>>>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.
>>>> ZooKeeperCompletedCheckpointStore  - Shutting down
>>>>
>>>> 2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.
>>>> ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_
>>>> 1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from
>>>> ZooKeeper
>>>>
>>>> 2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.
>>>> ZooKeeperCheckpointIDCounter  - Shutting down.
>>>>
>>>> 2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.
>>>> ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper
>>>>
>>>> 2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.
>>>> ZooKeeperSubmittedJobGraphStore  - Removed job graph
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.
>>>>
>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>> is in terminal state FAILED. Shutting down session
>>>>
>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                       - Stopping JobManager with final application
>>>> status FAILED and diagnostics: The monitored job with ID
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>
>>>> 2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.
>>>> YarnFlinkResourceManager                - Shutting down cluster with
>>>> status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>> has failed to complete.
>>>>
>>>> 2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.
>>>> YarnFlinkResourceManager                - Unregistering application
>>>> from the YARN Resource Manager
>>>>
>>>> 2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist
>>>>               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived
>>>> at hdfs:/savedSearches/prod/completed-jobs/
>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>
>>>>
>>>> Cheers,
>>>>
>>>
>>
>>
>> --
>>
>> <http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>
>>
>


-- 

<http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Posted by Till Rohrmann <tr...@apache.org>.
Could you check whether akka.tcp://
flink@blahabc.sfdc.net:123/user/taskmanager is still running? E.g. tries to
reconnect to the JobManager? If this is the case, then the container is
still running and the YarnFlinkResourceManager thinks that everything is
alright. You can activate that a TaskManager kills itself if it gets
quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` in the
`flink-conf.yaml`.

Cheers,
Till

On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <ss...@salesforce.com>
wrote:

> Hi Till,
> Greatly appreciate your reply.
> We use version 1.4.2. I do not see nothing unusual in the logs for TM that
> was lost. Note: I have looked at many such failures and see the same below
> pattern.
>
> The JM logs above had most of what I had, but the below is what I have
> when I search for flink.yarn (we have huge logs otherwise, given the amount
> of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost
> and unregistered by JM, operators start failing with
> NoResourceAvailableException since there was one less TM, 5 retry attempts
> later job goes down.
>
> ………….
>
> 2018-08-29 23:02:41,216 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                -
> TaskManager container_e27_1535135887442_0906_01_000124 has started.
>
> 2018-08-29 23:02:50,095 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                -
> TaskManager container_e27_1535135887442_0906_01_000159 has started.
>
> 2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335
> (streaming-searches-prod).
>
> 2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Using restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=5,
> delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.
>
> 2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Running initialization on master for job
> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).
>
> 2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Successfully ran initialization on master in 0 ms.
>
> 2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Using application-defined state backend for
> checkpoint/savepoint metadata: File State Backend @
> hdfs://security-temp/savedSearches/checkpoint.
>
> 2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335
> (streaming-searches-prod).
>
> 2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Attempting to recover all jobs.
>
> 2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager
>                   - There are 1 jobs to recover. Starting the job
> recovery.
>
> 2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Attempting to recover job
> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>
> 2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Ignoring job recovery for
> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.
>
> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager
> terminated.
>
> at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>
> at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>
> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Received message for non-existing checkpoint 1
>
> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in
> terminal state FAILED. Shutting down session
>
> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Stopping JobManager with final application status
> FAILED and diagnostics: The monitored job with ID
> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>
> 2018-08-29 23:15:26,129 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting
> down cluster with status FAILED : The monitored job with ID
> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>
> 2018-08-29 23:15:26,146 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                -
> Unregistering application from the YARN Resource Manager
>
> 2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Deleting yarn application files under
> hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.
>
> 2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net
> <http://blahabc.sfdc.net/>:1235/user/jobmanager.
>
> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager
>                   - Actor system shut down timed out.
>
> 2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Shutdown completed. Stopping JVM.
>
>
> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Subramanya,
>>
>> in order to help you I need a little bit of context. Which version of
>> Flink are you running? The configuration yarn.reallocate-failed is
>> deprecated since version Flink 1.1 and does not have an effect anymore.
>>
>> What would be helpful is to get the full jobmanager log from you. If the
>> YarnFlinkResourceManager gets notified that a container has failed, it
>> should restart this container (it will do this 145 times). So if the
>> YarnFlinkResourceManager does not get notified about a completed container,
>> then this might indicate that the container is still running. So it would
>> be good to check what the logs of container_e27_1535135887442_0906_01_000039
>> say.
>>
>> Moreover, do you see the same problem occur when using the latest release
>> Flink 1.6.0?
>>
>> Cheers,
>> Till
>>
>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <ss...@salesforce.com>
>> wrote:
>>
>>> Hi, we are seeing a weird issue where one TaskManager is lost and then
>>> never re-allocated and subsequently operators fail with
>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay
>>> restarts of 5) the application goes down.
>>>
>>>    - We have explicitly set *yarn.reallocate-failed: *true and have not
>>>    specified the yarn.maximum-failed-containers (and see
>>>    “org.apache.flink.yarn.YarnApplicationMasterRunner             -
>>>    YARN application tolerates 145 failed TaskManager containers before giving
>>>    up” in the logs).
>>>    - After the initial startup where all 145 TaskManagers are requested
>>>    I never see any logs saying “Requesting new TaskManager container” to
>>>    reallocate failed container.
>>>
>>>
>>> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>>>                     - Detected unreachable: [akka.tcp://
>>> flink@blahabc.sfdc.net:123]
>>>
>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Task manager akka.tcp://
>>> flink@blahabc.sfdc.net:123/user/taskmanager terminated.
>>>
>>> java.lang.Exception: TaskManager was lost/killed:
>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
>>> (dataPort=124)
>>>
>>> at
>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>
>>> at
>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>
>>> at
>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>
>>> at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>
>>> at
>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>
>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>
>>> at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>
>>> at
>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>>>
>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>
>>> at
>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>
>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>
>>> at
>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>
>>> at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>
>>> at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>
>>> at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>
>>> at
>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>>
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>
>>> at
>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>>
>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>
>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> java.lang.Exception: TaskManager was lost/killed:
>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414
>>> (dataPort=124)
>>>
>>> at
>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>
>>> at
>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>
>>> at
>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>
>>> at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>
>>> at
>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>
>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>
>>> at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>
>>> at
>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>>>
>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>
>>> at
>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>
>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>
>>> at
>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>
>>> at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>
>>> at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>
>>> at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>
>>> at
>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>>
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>
>>> at
>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>>
>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>
>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> 2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager
>>>             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1.
>>> Number of registered task managers 144. Number of available slots 720.
>>>
>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Received message for non-existing checkpoint 1
>>>
>>> 2018-08-29 23:14:39,969 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
>>> Recovering checkpoints from ZooKeeper.
>>>
>>> 2018-08-29 23:14:39,975 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
>>> Found 0 checkpoints in ZooKeeper.
>>>
>>> 2018-08-29 23:14:39,975 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
>>> Trying to fetch 0 checkpoints from storage.
>>>
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Not enough free slots available to run the job. You can decrease the
>>> operator parallelism or increase the number of slots per TaskManager in the
>>> configuration. Task to schedule: < Attempt #1
>>>
>>>
>>> *After 5 retries of our Sql query execution graph (we have configured 5
>>> fixed delay restart), it outputs the below, *
>>>
>>>
>>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>     - Stopping checkpoint coordinator for job
>>> ab7e96a1659fbb4bfb3c6cd9bccb0335
>>>
>>> 2018-08-29 23:15:22,216 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
>>> Shutting down
>>>
>>> 2018-08-29 23:15:22,225 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
>>> Removing
>>> /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335
>>> from ZooKeeper
>>>
>>> 2018-08-29 23:15:23,460 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>>> Shutting down.
>>>
>>> 2018-08-29 23:15:23,460 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>>> Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper
>>>
>>> 2018-08-29 23:15:24,114 INFO
>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>> Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.
>>>
>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is
>>> in terminal state FAILED. Shutting down session
>>>
>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>                     - Stopping JobManager with final application status
>>> FAILED and diagnostics: The monitored job with ID
>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>
>>> 2018-08-29 23:15:26,129 INFO
>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>> Shutting down cluster with status FAILED : The monitored job with ID
>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>
>>> 2018-08-29 23:15:26,146 INFO
>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>> Unregistering application from the YARN Resource Manager
>>>
>>> 2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist
>>>               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived
>>> at hdfs:/savedSearches/prod/completed-jobs/ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>
>>>
>>> Cheers,
>>>
>>
>
>
> --
>
> <http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>
>

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Posted by Subramanya Suresh <ss...@salesforce.com>.
Hi Till,
Greatly appreciate your reply.
We use version 1.4.2. I do not see nothing unusual in the logs for TM that
was lost. Note: I have looked at many such failures and see the same below
pattern.

The JM logs above had most of what I had, but the below is what I have when
I search for flink.yarn (we have huge logs otherwise, given the amount of
SQL queries we run). The gist is Akka detecs unreachable, TM marked lost
and unregistered by JM, operators start failing with
NoResourceAvailableException since there was one less TM, 5 retry attempts
later job goes down.

………….

2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
              - TaskManager container_e27_1535135887442_0906_01_000124 has
started.

2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
              - TaskManager container_e27_1535135887442_0906_01_000159 has
started.

2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager
                - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335
(streaming-searches-prod).

2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager
                - Using restart strategy
FixedDelayRestartStrategy(maxNumberRestartAttempts=5,
delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager
                - Running initialization on master for job
streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).

2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager
                - Successfully ran initialization on master in 0 ms.

2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager
                - Using application-defined state backend for
checkpoint/savepoint metadata: File State Backend @
hdfs://security-temp/savedSearches/checkpoint.

2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager
                - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335
(streaming-searches-prod).

2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager
                - Attempting to recover all jobs.

2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager
                - There are 1 jobs to recover. Starting the job recovery.

2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager
                - Attempting to recover job
ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager
                - Ignoring job recovery for
ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
                - Task manager
akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager
terminated.

at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
                - Received message for non-existing checkpoint 1

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
                - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in
terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
                - Stopping JobManager with final application status FAILED
and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
              - Shutting down cluster with status FAILED : The monitored
job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
              - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager
                - Deleting yarn application files under
hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.

2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager
                - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net
<http://blahabc.sfdc.net/>:1235/user/jobmanager.

2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager
                - Actor system shut down timed out.

2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager
                - Shutdown completed. Stopping JVM.


On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Subramanya,
>
> in order to help you I need a little bit of context. Which version of
> Flink are you running? The configuration yarn.reallocate-failed is
> deprecated since version Flink 1.1 and does not have an effect anymore.
>
> What would be helpful is to get the full jobmanager log from you. If the
> YarnFlinkResourceManager gets notified that a container has failed, it
> should restart this container (it will do this 145 times). So if the
> YarnFlinkResourceManager does not get notified about a completed container,
> then this might indicate that the container is still running. So it would
> be good to check what the logs of container_e27_1535135887442_0906_01_000039
> say.
>
> Moreover, do you see the same problem occur when using the latest release
> Flink 1.6.0?
>
> Cheers,
> Till
>
> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <ss...@salesforce.com>
> wrote:
>
>> Hi, we are seeing a weird issue where one TaskManager is lost and then
>> never re-allocated and subsequently operators fail with
>> NoResourceAvailableException and after 5 restarts (we have FixedDelay
>> restarts of 5) the application goes down.
>>
>>    - We have explicitly set *yarn.reallocate-failed: *true and have not
>>    specified the yarn.maximum-failed-containers (and see
>>    “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN
>>    application tolerates 145 failed TaskManager containers before giving up”
>>    in the logs).
>>    - After the initial startup where all 145 TaskManagers are requested
>>    I never see any logs saying “Requesting new TaskManager container” to
>>    reallocate failed container.
>>
>>
>> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>>                     - Detected unreachable: [akka.tcp://flink@blahabc.
>> sfdc.net:123]
>>
>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Task manager akka.tcp://flink@blahabc.sfdc.
>> net:123/user/taskmanager terminated.
>>
>> java.lang.Exception: TaskManager was lost/killed:
>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
>> (dataPort=124)
>>
>> at org.apache.flink.runtime.instance.SimpleSlot.
>> releaseSlot(SimpleSlot.java:217)
>>
>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.
>> releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>
>> at org.apache.flink.runtime.instance.SharedSlot.
>> releaseSlot(SharedSlot.java:192)
>>
>> at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>
>> at org.apache.flink.runtime.instance.InstanceManager.
>> unregisterTaskManager(InstanceManager.java:212)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager.org$
>> apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(
>> JobManager.scala:1198)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$
>> anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>
>> at scala.runtime.AbstractPartialFunction.apply(
>> AbstractPartialFunction.scala:36)
>>
>> at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$
>> anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:
>> 107)
>>
>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>
>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>> applyOrElse(YarnJobManager.scala:110)
>>
>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>
>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
>> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>
>> at scala.runtime.AbstractPartialFunction.apply(
>> AbstractPartialFunction.scala:36)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>> LogMessages.scala:33)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>> LogMessages.scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.
>> applyOrElse(LogMessages.scala:28)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager.
>> aroundReceive(JobManager.scala:122)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>
>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(
>> DeathWatch.scala:46)
>>
>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>
>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>> runTask(ForkJoinPool.java:1339)
>>
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>> ForkJoinPool.java:1979)
>>
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>> ForkJoinWorkerThread.java:107)
>>
>> java.lang.Exception: TaskManager was lost/killed:
>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414
>> (dataPort=124)
>>
>> at org.apache.flink.runtime.instance.SimpleSlot.
>> releaseSlot(SimpleSlot.java:217)
>>
>> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.
>> releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>
>> at org.apache.flink.runtime.instance.SharedSlot.
>> releaseSlot(SharedSlot.java:192)
>>
>> at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>
>> at org.apache.flink.runtime.instance.InstanceManager.
>> unregisterTaskManager(InstanceManager.java:212)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager.org$
>> apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(
>> JobManager.scala:1198)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$
>> anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>
>> at scala.runtime.AbstractPartialFunction.apply(
>> AbstractPartialFunction.scala:36)
>>
>> at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$
>> anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:
>> 107)
>>
>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>
>> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.
>> applyOrElse(YarnJobManager.scala:110)
>>
>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>
>> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
>> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>
>> at scala.runtime.AbstractPartialFunction.apply(
>> AbstractPartialFunction.scala:36)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>> LogMessages.scala:33)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
>> LogMessages.scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.
>> applyOrElse(LogMessages.scala:28)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager.
>> aroundReceive(JobManager.scala:122)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>
>> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(
>> DeathWatch.scala:46)
>>
>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>
>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>> runTask(ForkJoinPool.java:1339)
>>
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>> ForkJoinPool.java:1979)
>>
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>> ForkJoinWorkerThread.java:107)
>>
>> 2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager
>>             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1.
>> Number of registered task managers 144. Number of available slots 720.
>>
>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Received message for non-existing checkpoint 1
>>
>> 2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.
>> ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from
>> ZooKeeper.
>>
>> 2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.
>> ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
>>
>> 2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.
>> ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from
>> storage.
>>
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Not enough free slots available to run the job. You can decrease the
>> operator parallelism or increase the number of slots per TaskManager in the
>> configuration. Task to schedule: < Attempt #1
>>
>>
>> *After 5 retries of our Sql query execution graph (we have configured 5
>> fixed delay restart), it outputs the below, *
>>
>>
>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>     - Stopping checkpoint coordinator for job
>> ab7e96a1659fbb4bfb3c6cd9bccb0335
>>
>> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.
>> ZooKeeperCompletedCheckpointStore  - Shutting down
>>
>> 2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.
>> ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_
>> 1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from
>> ZooKeeper
>>
>> 2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.
>> ZooKeeperCheckpointIDCounter  - Shutting down.
>>
>> 2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.
>> ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/
>> ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper
>>
>> 2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.
>> ZooKeeperSubmittedJobGraphStore  - Removed job graph
>> ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.
>>
>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in
>> terminal state FAILED. Shutting down session
>>
>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>                     - Stopping JobManager with final application status
>> FAILED and diagnostics: The monitored job with ID
>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>
>> 2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.
>> YarnFlinkResourceManager                - Shutting down cluster with
>> status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335
>> has failed to complete.
>>
>> 2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.
>> YarnFlinkResourceManager                - Unregistering application from
>> the YARN Resource Manager
>>
>> 2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist
>>               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived
>> at hdfs:/savedSearches/prod/completed-jobs/ab7e96a1659fbb4bfb3c6cd9bccb03
>> 35.
>>
>>
>> Cheers,
>>
>


-- 

<http://smart.salesforce.com/sig/ssuresh//us_mb/default/link.html>

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Posted by Till Rohrmann <tr...@apache.org>.
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink
are you running? The configuration yarn.reallocate-failed is deprecated
since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the
YarnFlinkResourceManager gets notified that a container has failed, it
should restart this container (it will do this 145 times). So if the
YarnFlinkResourceManager does not get notified about a completed container,
then this might indicate that the container is still running. So it would
be good to check what the logs of container_e27_1535135887442_0906_01_000039
say.

Moreover, do you see the same problem occur when using the latest release
Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <ss...@salesforce.com>
wrote:

> Hi, we are seeing a weird issue where one TaskManager is lost and then
> never re-allocated and subsequently operators fail with
> NoResourceAvailableException and after 5 restarts (we have FixedDelay
> restarts of 5) the application goes down.
>
>    - We have explicitly set *yarn.reallocate-failed: *true and have not
>    specified the yarn.maximum-failed-containers (and see
>    “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN
>    application tolerates 145 failed TaskManager containers before giving up”
>    in the logs).
>    - After the initial startup where all 145 TaskManagers are requested I
>    never see any logs saying “Requesting new TaskManager container” to
>    reallocate failed container.
>
>
> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>                   - Detected unreachable: [akka.tcp://
> flink@blahabc.sfdc.net:123]
>
> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Task manager akka.tcp://
> flink@blahabc.sfdc.net:123/user/taskmanager terminated.
>
> java.lang.Exception: TaskManager was lost/killed:
> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
> (dataPort=124)
>
> at
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>
> at
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>
> at
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>
> at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>
> at
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at
> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
> at
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> java.lang.Exception: TaskManager was lost/killed:
> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414
> (dataPort=124)
>
> at
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>
> at
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>
> at
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>
> at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>
> at
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at
> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
> at
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager
>             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number
> of registered task managers 144. Number of available slots 720.
>
> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Received message for non-existing checkpoint 1
>
> 2018-08-29 23:14:39,969 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Recovering checkpoints from ZooKeeper.
>
> 2018-08-29 23:14:39,975 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Found 0 checkpoints in ZooKeeper.
>
> 2018-08-29 23:14:39,975 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Trying to fetch 0 checkpoints from storage.
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. Task to schedule: < Attempt #1
>
>
> *After 5 retries of our Sql query execution graph (we have configured 5
> fixed delay restart), it outputs the below, *
>
>
> 2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>     - Stopping checkpoint coordinator for job
> ab7e96a1659fbb4bfb3c6cd9bccb0335
>
> 2018-08-29 23:15:22,216 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Shutting down
>
> 2018-08-29 23:15:22,225 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Removing
> /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335
> from ZooKeeper
>
> 2018-08-29 23:15:23,460 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
> Shutting down.
>
> 2018-08-29 23:15:23,460 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
> Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper
>
> 2018-08-29 23:15:24,114 INFO
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.
>
> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in
> terminal state FAILED. Shutting down session
>
> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>                   - Stopping JobManager with final application status
> FAILED and diagnostics: The monitored job with ID
> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>
> 2018-08-29 23:15:26,129 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting
> down cluster with status FAILED : The monitored job with ID
> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>
> 2018-08-29 23:15:26,146 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                -
> Unregistering application from the YARN Resource Manager
>
> 2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist
>               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived at
> hdfs:/savedSearches/prod/completed-jobs/ab7e96a1659fbb4bfb3c6cd9bccb0335.
>
>
> Cheers,
>