You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shannon Carey <sc...@expedia.com> on 2017/01/05 00:21:16 UTC

Rapidly failing job eventually causes "Not enough free slots"

In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice and I'm wondering if anyone has some insight about it.

In both cases, we deployed a job that fails very frequently (within 15s-1m of launch). Eventually, the Flink cluster dies.

The sequence of events looks something like this:

  *   bad job is launched
  *   bad job fails & is restarted many times (I didn't have the "failure-rate" restart strategy configuration right)
  *   Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner (SIGTERM handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
  *   At this point, the YARN resource manager also logs the container failure
  *   More logs: Container ResourceID{resourceId='container_1481658997383_0003_01_000013'} failed. Exit status: Pmem limit exceeded (-104)
  *
Diagnostics for container ResourceID{resourceId='container_1481658997383_0003_01_000013'} in state COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container [pid=21246,containerID=container_1481658997383_0003_01_000013] is running beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB physical memory used; 9.6 GB of 28.1 GB virtual memory used. Killing container.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Total number of failed containers so far: 12
Stopping YARN session because the number of failed containers (12) exceeded the maximum failed containers (11). This number is controlled by the 'yarn.maximum-failed-containers' configuration setting. By default its the number of requested containers.
  *   From here onward, the logs repeatedly show that jobs fail to restart due to "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #68 (Source: …) @ (unassigned) - [SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in sharing group < SlotSharingGroup [73191c171abfff61fb5102c161274145, 19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler: Number of instances=0, total number of slots=0, available slots=0"
  *   Eventually, Flink stops for some reason (with another SIGTERM message), presumably because of YARN

Does anyone have an idea why a bad job repeatedly failing would eventually result in the Flink cluster dying?

Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots available to run the job"? The JVM heap usage and the free memory on the machines both look reasonable in my monitoring dashboards. Could it possibly be a memory leak due to classloading or something?

Thanks for any help or suggestions you can provide! I am hoping that the "failure-rate" restart strategy will help avoid this issue in the future, but I'd also like to understand what's making the cluster die so that I can prevent it.

-Shannon

Re: Rapidly failing job eventually causes "Not enough free slots"

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Adding Ufuk and Till to this...

You are right, these issues should not compromise HA. Is it possible that
you share the logs to diagnose what the issue was?

@Till, Ufuk: Can it be that the ZooKeeper client gave up for good trying to
connect to ZooKeeper after a certain time?

Stephan

On Tue, Jan 24, 2017 at 6:16 PM, Shannon Carey <sc...@expedia.com> wrote:

> I am running 1.1.4. It does look like there were problems with the
> connection to Zookeeper due to overworking the network. I'm not sure what I
> can do about it (not sure what happens when a JM loses leadership), but
> ideally a cluster-wide failure would not result in losing all the jobs,
> checkpoints, etc.
>
> -Shannon
>
> From: Stephan Ewen <se...@apache.org>
> Date: Tuesday, January 24, 2017 at 8:07 AM
>
> To: <us...@flink.apache.org>
> Subject: Re: Rapidly failing job eventually causes "Not enough free slots"
>
> Hi!
>
> I think there were some issues in the HA recovery of 1.1.3 that were fixed
> in 1.1.4 and 1.2.0.
> What version are you running on?
>
> Stephan
>
>
> On Sat, Jan 21, 2017 at 4:58 PM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> Hey Shannon,
>>
>> the final truth for recovery is in ZooKeeper. Can you check whether
>> there also references available in ZooKeeper? Do you have the job
>> manager logs available from after the failure? On recovery, Flink
>> checks ZooKeeper for entries. These point to files in the storageDir.
>> It could have happened that these got out of sync, e.g. entries
>> deleted from ZK but not from the storageDir.
>>
>> Maybe the loss of the task managers can also be explained by a
>> connection loss to ZK or something. When a JM looses leadership, the
>> TMs cancel all tasks and disconnect from the JM. Here again, we would
>> need to look into the logs.
>>
>> Sorry for the bad experience lately :-(
>>
>> – Ufuk
>>
>>
>> On Sat, Jan 21, 2017 at 4:31 AM, Shannon Carey <sc...@expedia.com>
>> wrote:
>> > In fact, I can see all my job jar blobs and some checkpoint & job graph
>> > files in my configured "recovery.zookeeper.storageDir"… however for
>> some
>> > reason it didn't get restored when my new Flink cluster started up.
>> >
>> >
>> > From: Shannon Carey <sc...@expedia.com>
>> > Date: Friday, January 20, 2017 at 9:14 PM
>> > To: "user@flink.apache.org" <us...@flink.apache.org>
>> >
>> > Subject: Re: Rapidly failing job eventually causes "Not enough free
>> slots"
>> >
>> > I recently added some better visibility into the metrics we're gathering
>> > from Flink. My Flink cluster died again due to the "Not enough free
>> slots
>> > available to run the job" problem, and this time I can see that the
>> number
>> > of registered task managers went down from 11 to 7, then waffled and
>> only
>> > ever got back up to 10 (one short of the requested amount) before
>> dropping
>> > to 0 just before the cluster died. This would seem to explain why there
>> > weren't sufficient slots (given that we were probably using them all or
>> > nearly all)… The metric of "running jobs" went down from 5 to 3 during
>> this
>> > time period as well. So the problem seems to be loss of taskmanagers
>> due to
>> > errors (not yet sure what exactly as I have to delve into logs).
>> >
>> > The other thing I have to figure out is restoring the jobs… I thought
>> that
>> > HA would start the jobs back up again if Flink died & I re-launched it,
>> but
>> > that doesn't appear to be the case.
>> >
>> >
>> > From: Stephan Ewen <se...@apache.org>
>> > Date: Thursday, January 5, 2017 at 7:52 AM
>> > To: <us...@flink.apache.org>
>> > Subject: Re: Rapidly failing job eventually causes "Not enough free
>> slots"
>> >
>> > Another thought on the container failure:
>> >
>> > in 1.1, the user code is loaded dynamically whenever a Task is started.
>> That
>> > means that on every task restart the code is reloaded. For that to work
>> > proper, class unloading needs to happen, or the permgen will eventually
>> > overflow.
>> >
>> > It can happen that class unloading is prevented if the user functions do
>> > leave references around as "GC roots", which may be threads, or
>> references
>> > in registries, etc.
>> >
>> > In Flink 1.2, YARN will put the user code into the application
>> classpath, so
>> > code needs not be reloaded on every restart. That should solve that
>> issue.
>> > To "simulate" that behavior in Flink 1.1, put your application code jars
>> > into the "lib" folder
>> >
>> > Best,
>> > Stephan
>> >
>> >
>> > On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin <yu...@gmail.com>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> I've faced a similar issue recently. Hope sharing my findings will
>> help.
>> >> The problem can be split into 2 parts:
>> >>
>> >> Source of container failures
>> >> The logs you provided indicate that YARN kills its containers for
>> >> exceeding memory limits. Important point here is that memory limit =
>> JVM
>> >> heap memory + off-heap memory. So if off-heap memory usage is high,
>> YARN may
>> >> kill containers despite JVM heap consumption is fine. To solve this
>> issue,
>> >> Flink reserves a share of container memory for off-heap memory. How
>> much
>> >> will be reserved is controlled by yarn.heap-cutoff-ratio and
>> >> yarn.heap-cutoff-min configuration. By default 25% of the requested
>> >> container memory will be reserved for off-heap. This is seems to be a
>> good
>> >> start, but one should experiment and tune to meet their job specifics.
>> >>
>> >> It's also worthwhile to figure out who consumes off-heap memory. Is it
>> >> Flink managed memory moved off heap (taskmanager.memory.off-heap =
>> true)? Is
>> >> it some external library allocating something off heap? Is it your own
>> code?
>> >>
>> >> How Flink handles task manager failures
>> >> Whenever a task manager fails, the Flink jobmanager decides whether it
>> >> should:
>> >> - reallocate failed task manager container
>> >> - fail application entirely
>> >> These decisions can be guided by certain configuration
>> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.
>> 1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn).
>> >> With default settings, job manager does reallocate task manager
>> containers
>> >> up to the point when N failures have been observed, where N is the
>> number of
>> >> requested task managers. After that the application is stopped.
>> >>
>> >> According to the logs, you have a finite number in
>> >> yarn.maximum-failed-containers (11, as I can see from the logs - this
>> may be
>> >> set by Flink if not provided explicitly). On 12th container restart,
>> >> jobmanager gives up and the application stops. I'm not sure why it
>> keeps
>> >> reporting not enough slots after that point. In my experience this may
>> >> happen when job eats up all the available slots, so that after
>> container
>> >> failure its tasks cannot be restarted in other (live) containers. But I
>> >> believe once the decision to stop the application is made, there
>> should not
>> >> be any further attempts to restart the job, hence no logs like those.
>> >> Hopefully, someone else will explain this to us :)
>> >>
>> >> In my case I made jobmanager restart containers infinitely by setting
>> >> yarn.maximum-failed-containers = -1, so that taskmanager failure never
>> >> results in application death. Note this is unlikely a good choice for a
>> >> batch job.
>> >>
>> >> Regards,
>> >> Yury
>> >>
>> >> 2017-01-05 3:21 GMT+03:00 Shannon Carey <sc...@expedia.com>:
>> >>>
>> >>> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem
>> twice
>> >>> and I'm wondering if anyone has some insight about it.
>> >>>
>> >>> In both cases, we deployed a job that fails very frequently (within
>> >>> 15s-1m of launch). Eventually, the Flink cluster dies.
>> >>>
>> >>> The sequence of events looks something like this:
>> >>>
>> >>> bad job is launched
>> >>> bad job fails & is restarted many times (I didn't have the
>> "failure-rate"
>> >>> restart strategy configuration right)
>> >>> Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner
>> (SIGTERM
>> >>> handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>> >>> At this point, the YARN resource manager also logs the container
>> failure
>> >>> More logs: Container
>> >>> ResourceID{resourceId='container_1481658997383_0003_01_000013'}
>> failed. Exit
>> >>> status: Pmem limit exceeded (-104)
>> >>> Diagnostics for container
>> >>> ResourceID{resourceId='container_1481658997383_0003_01_000013'} in
>> state
>> >>> COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container
>> >>> [pid=21246,containerID=container_1481658997383_0003_01_000013] is
>> running
>> >>> beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB
>> physical
>> >>> memory used; 9.6 GB of 28.1 GB virtual memory used. Killing container.
>> >>> Container killed on request. Exit code is 143
>> >>> Container exited with a non-zero exit code 143
>> >>> Total number of failed containers so far: 12
>> >>> Stopping YARN session because the number of failed containers (12)
>> >>> exceeded the maximum failed containers (11). This number is
>> controlled by
>> >>> the 'yarn.maximum-failed-containers' configuration setting. By
>> default its
>> >>> the number of requested containers.
>> >>> From here onward, the logs repeatedly show that jobs fail to restart
>> due
>> >>> to
>> >>> "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAva
>> ilableException:
>> >>> Not enough free slots available to run the job. You can decrease the
>> >>> operator parallelism or increase the number of slots per TaskManager
>> in the
>> >>> configuration. Task to schedule: < Attempt #68 (Source: …) @
>> (unassigned) -
>> >>> [SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in
>> sharing
>> >>> group < SlotSharingGroup [73191c171abfff61fb5102c161274145,
>> >>> 19596f7834805c8409c419f0edab1f1b] >. Resources available to
>> scheduler:
>> >>> Number of instances=0, total number of slots=0, available slots=0"
>> >>> Eventually, Flink stops for some reason (with another SIGTERM
>> message),
>> >>> presumably because of YARN
>> >>>
>> >>> Does anyone have an idea why a bad job repeatedly failing would
>> >>> eventually result in the Flink cluster dying?
>> >>>
>> >>> Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots
>> >>> available to run the job"? The JVM heap usage and the free memory on
>> the
>> >>> machines both look reasonable in my monitoring dashboards. Could it
>> possibly
>> >>> be a memory leak due to classloading or something?
>> >>>
>> >>> Thanks for any help or suggestions you can provide! I am hoping that
>> the
>> >>> "failure-rate" restart strategy will help avoid this issue in the
>> future,
>> >>> but I'd also like to understand what's making the cluster die so that
>> I can
>> >>> prevent it.
>> >>>
>> >>> -Shannon
>> >>
>> >>
>> >
>>
>
>

Re: Rapidly failing job eventually causes "Not enough free slots"

Posted by Shannon Carey <sc...@expedia.com>.
I am running 1.1.4. It does look like there were problems with the connection to Zookeeper due to overworking the network. I'm not sure what I can do about it (not sure what happens when a JM loses leadership), but ideally a cluster-wide failure would not result in losing all the jobs, checkpoints, etc.

-Shannon

From: Stephan Ewen <se...@apache.org>>
Date: Tuesday, January 24, 2017 at 8:07 AM
To: <us...@flink.apache.org>>
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

Hi!

I think there were some issues in the HA recovery of 1.1.3 that were fixed in 1.1.4 and 1.2.0.
What version are you running on?

Stephan


On Sat, Jan 21, 2017 at 4:58 PM, Ufuk Celebi <uc...@apache.org>> wrote:
Hey Shannon,

the final truth for recovery is in ZooKeeper. Can you check whether
there also references available in ZooKeeper? Do you have the job
manager logs available from after the failure? On recovery, Flink
checks ZooKeeper for entries. These point to files in the storageDir.
It could have happened that these got out of sync, e.g. entries
deleted from ZK but not from the storageDir.

Maybe the loss of the task managers can also be explained by a
connection loss to ZK or something. When a JM looses leadership, the
TMs cancel all tasks and disconnect from the JM. Here again, we would
need to look into the logs.

Sorry for the bad experience lately :-(

– Ufuk


On Sat, Jan 21, 2017 at 4:31 AM, Shannon Carey <sc...@expedia.com>> wrote:
> In fact, I can see all my job jar blobs and some checkpoint & job graph
> files in my configured "recovery.zookeeper.storageDir"… however for some
> reason it didn't get restored when my new Flink cluster started up.
>
>
> From: Shannon Carey <sc...@expedia.com>>
> Date: Friday, January 20, 2017 at 9:14 PM
> To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
>
> Subject: Re: Rapidly failing job eventually causes "Not enough free slots"
>
> I recently added some better visibility into the metrics we're gathering
> from Flink. My Flink cluster died again due to the "Not enough free slots
> available to run the job" problem, and this time I can see that the number
> of registered task managers went down from 11 to 7, then waffled and only
> ever got back up to 10 (one short of the requested amount) before dropping
> to 0 just before the cluster died. This would seem to explain why there
> weren't sufficient slots (given that we were probably using them all or
> nearly all)… The metric of "running jobs" went down from 5 to 3 during this
> time period as well. So the problem seems to be loss of taskmanagers due to
> errors (not yet sure what exactly as I have to delve into logs).
>
> The other thing I have to figure out is restoring the jobs… I thought that
> HA would start the jobs back up again if Flink died & I re-launched it, but
> that doesn't appear to be the case.
>
>
> From: Stephan Ewen <se...@apache.org>>
> Date: Thursday, January 5, 2017 at 7:52 AM
> To: <us...@flink.apache.org>>
> Subject: Re: Rapidly failing job eventually causes "Not enough free slots"
>
> Another thought on the container failure:
>
> in 1.1, the user code is loaded dynamically whenever a Task is started. That
> means that on every task restart the code is reloaded. For that to work
> proper, class unloading needs to happen, or the permgen will eventually
> overflow.
>
> It can happen that class unloading is prevented if the user functions do
> leave references around as "GC roots", which may be threads, or references
> in registries, etc.
>
> In Flink 1.2, YARN will put the user code into the application classpath, so
> code needs not be reloaded on every restart. That should solve that issue.
> To "simulate" that behavior in Flink 1.1, put your application code jars
> into the "lib" folder
>
> Best,
> Stephan
>
>
> On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin <yu...@gmail.com>> wrote:
>>
>> Hi,
>>
>> I've faced a similar issue recently. Hope sharing my findings will help.
>> The problem can be split into 2 parts:
>>
>> Source of container failures
>> The logs you provided indicate that YARN kills its containers for
>> exceeding memory limits. Important point here is that memory limit = JVM
>> heap memory + off-heap memory. So if off-heap memory usage is high, YARN may
>> kill containers despite JVM heap consumption is fine. To solve this issue,
>> Flink reserves a share of container memory for off-heap memory. How much
>> will be reserved is controlled by yarn.heap-cutoff-ratio and
>> yarn.heap-cutoff-min configuration. By default 25% of the requested
>> container memory will be reserved for off-heap. This is seems to be a good
>> start, but one should experiment and tune to meet their job specifics.
>>
>> It's also worthwhile to figure out who consumes off-heap memory. Is it
>> Flink managed memory moved off heap (taskmanager.memory.off-heap = true)? Is
>> it some external library allocating something off heap? Is it your own code?
>>
>> How Flink handles task manager failures
>> Whenever a task manager fails, the Flink jobmanager decides whether it
>> should:
>> - reallocate failed task manager container
>> - fail application entirely
>> These decisions can be guided by certain configuration
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn).
>> With default settings, job manager does reallocate task manager containers
>> up to the point when N failures have been observed, where N is the number of
>> requested task managers. After that the application is stopped.
>>
>> According to the logs, you have a finite number in
>> yarn.maximum-failed-containers (11, as I can see from the logs - this may be
>> set by Flink if not provided explicitly). On 12th container restart,
>> jobmanager gives up and the application stops. I'm not sure why it keeps
>> reporting not enough slots after that point. In my experience this may
>> happen when job eats up all the available slots, so that after container
>> failure its tasks cannot be restarted in other (live) containers. But I
>> believe once the decision to stop the application is made, there should not
>> be any further attempts to restart the job, hence no logs like those.
>> Hopefully, someone else will explain this to us :)
>>
>> In my case I made jobmanager restart containers infinitely by setting
>> yarn.maximum-failed-containers = -1, so that taskmanager failure never
>> results in application death. Note this is unlikely a good choice for a
>> batch job.
>>
>> Regards,
>> Yury
>>
>> 2017-01-05 3:21 GMT+03:00 Shannon Carey <sc...@expedia.com>>:
>>>
>>> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice
>>> and I'm wondering if anyone has some insight about it.
>>>
>>> In both cases, we deployed a job that fails very frequently (within
>>> 15s-1m of launch). Eventually, the Flink cluster dies.
>>>
>>> The sequence of events looks something like this:
>>>
>>> bad job is launched
>>> bad job fails & is restarted many times (I didn't have the "failure-rate"
>>> restart strategy configuration right)
>>> Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner (SIGTERM
>>> handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>>> At this point, the YARN resource manager also logs the container failure
>>> More logs: Container
>>> ResourceID{resourceId='container_1481658997383_0003_01_000013'} failed. Exit
>>> status: Pmem limit exceeded (-104)
>>> Diagnostics for container
>>> ResourceID{resourceId='container_1481658997383_0003_01_000013'} in state
>>> COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container
>>> [pid=21246,containerID=container_1481658997383_0003_01_000013] is running
>>> beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB physical
>>> memory used; 9.6 GB of 28.1 GB virtual memory used. Killing container.
>>> Container killed on request. Exit code is 143
>>> Container exited with a non-zero exit code 143
>>> Total number of failed containers so far: 12
>>> Stopping YARN session because the number of failed containers (12)
>>> exceeded the maximum failed containers (11). This number is controlled by
>>> the 'yarn.maximum-failed-containers' configuration setting. By default its
>>> the number of requested containers.
>>> From here onward, the logs repeatedly show that jobs fail to restart due
>>> to
>>> "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Not enough free slots available to run the job. You can decrease the
>>> operator parallelism or increase the number of slots per TaskManager in the
>>> configuration. Task to schedule: < Attempt #68 (Source: …) @ (unassigned) -
>>> [SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in sharing
>>> group < SlotSharingGroup [73191c171abfff61fb5102c161274145,
>>> 19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler:
>>> Number of instances=0, total number of slots=0, available slots=0"
>>> Eventually, Flink stops for some reason (with another SIGTERM message),
>>> presumably because of YARN
>>>
>>> Does anyone have an idea why a bad job repeatedly failing would
>>> eventually result in the Flink cluster dying?
>>>
>>> Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots
>>> available to run the job"? The JVM heap usage and the free memory on the
>>> machines both look reasonable in my monitoring dashboards. Could it possibly
>>> be a memory leak due to classloading or something?
>>>
>>> Thanks for any help or suggestions you can provide! I am hoping that the
>>> "failure-rate" restart strategy will help avoid this issue in the future,
>>> but I'd also like to understand what's making the cluster die so that I can
>>> prevent it.
>>>
>>> -Shannon
>>
>>
>


Re: Rapidly failing job eventually causes "Not enough free slots"

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I think there were some issues in the HA recovery of 1.1.3 that were fixed
in 1.1.4 and 1.2.0.
What version are you running on?

Stephan


On Sat, Jan 21, 2017 at 4:58 PM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Shannon,
>
> the final truth for recovery is in ZooKeeper. Can you check whether
> there also references available in ZooKeeper? Do you have the job
> manager logs available from after the failure? On recovery, Flink
> checks ZooKeeper for entries. These point to files in the storageDir.
> It could have happened that these got out of sync, e.g. entries
> deleted from ZK but not from the storageDir.
>
> Maybe the loss of the task managers can also be explained by a
> connection loss to ZK or something. When a JM looses leadership, the
> TMs cancel all tasks and disconnect from the JM. Here again, we would
> need to look into the logs.
>
> Sorry for the bad experience lately :-(
>
> – Ufuk
>
>
> On Sat, Jan 21, 2017 at 4:31 AM, Shannon Carey <sc...@expedia.com> wrote:
> > In fact, I can see all my job jar blobs and some checkpoint & job graph
> > files in my configured "recovery.zookeeper.storageDir"… however for some
> > reason it didn't get restored when my new Flink cluster started up.
> >
> >
> > From: Shannon Carey <sc...@expedia.com>
> > Date: Friday, January 20, 2017 at 9:14 PM
> > To: "user@flink.apache.org" <us...@flink.apache.org>
> >
> > Subject: Re: Rapidly failing job eventually causes "Not enough free
> slots"
> >
> > I recently added some better visibility into the metrics we're gathering
> > from Flink. My Flink cluster died again due to the "Not enough free slots
> > available to run the job" problem, and this time I can see that the
> number
> > of registered task managers went down from 11 to 7, then waffled and only
> > ever got back up to 10 (one short of the requested amount) before
> dropping
> > to 0 just before the cluster died. This would seem to explain why there
> > weren't sufficient slots (given that we were probably using them all or
> > nearly all)… The metric of "running jobs" went down from 5 to 3 during
> this
> > time period as well. So the problem seems to be loss of taskmanagers due
> to
> > errors (not yet sure what exactly as I have to delve into logs).
> >
> > The other thing I have to figure out is restoring the jobs… I thought
> that
> > HA would start the jobs back up again if Flink died & I re-launched it,
> but
> > that doesn't appear to be the case.
> >
> >
> > From: Stephan Ewen <se...@apache.org>
> > Date: Thursday, January 5, 2017 at 7:52 AM
> > To: <us...@flink.apache.org>
> > Subject: Re: Rapidly failing job eventually causes "Not enough free
> slots"
> >
> > Another thought on the container failure:
> >
> > in 1.1, the user code is loaded dynamically whenever a Task is started.
> That
> > means that on every task restart the code is reloaded. For that to work
> > proper, class unloading needs to happen, or the permgen will eventually
> > overflow.
> >
> > It can happen that class unloading is prevented if the user functions do
> > leave references around as "GC roots", which may be threads, or
> references
> > in registries, etc.
> >
> > In Flink 1.2, YARN will put the user code into the application
> classpath, so
> > code needs not be reloaded on every restart. That should solve that
> issue.
> > To "simulate" that behavior in Flink 1.1, put your application code jars
> > into the "lib" folder
> >
> > Best,
> > Stephan
> >
> >
> > On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin <yu...@gmail.com>
> wrote:
> >>
> >> Hi,
> >>
> >> I've faced a similar issue recently. Hope sharing my findings will help.
> >> The problem can be split into 2 parts:
> >>
> >> Source of container failures
> >> The logs you provided indicate that YARN kills its containers for
> >> exceeding memory limits. Important point here is that memory limit = JVM
> >> heap memory + off-heap memory. So if off-heap memory usage is high,
> YARN may
> >> kill containers despite JVM heap consumption is fine. To solve this
> issue,
> >> Flink reserves a share of container memory for off-heap memory. How much
> >> will be reserved is controlled by yarn.heap-cutoff-ratio and
> >> yarn.heap-cutoff-min configuration. By default 25% of the requested
> >> container memory will be reserved for off-heap. This is seems to be a
> good
> >> start, but one should experiment and tune to meet their job specifics.
> >>
> >> It's also worthwhile to figure out who consumes off-heap memory. Is it
> >> Flink managed memory moved off heap (taskmanager.memory.off-heap =
> true)? Is
> >> it some external library allocating something off heap? Is it your own
> code?
> >>
> >> How Flink handles task manager failures
> >> Whenever a task manager fails, the Flink jobmanager decides whether it
> >> should:
> >> - reallocate failed task manager container
> >> - fail application entirely
> >> These decisions can be guided by certain configuration
> >> (https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn).
> >> With default settings, job manager does reallocate task manager
> containers
> >> up to the point when N failures have been observed, where N is the
> number of
> >> requested task managers. After that the application is stopped.
> >>
> >> According to the logs, you have a finite number in
> >> yarn.maximum-failed-containers (11, as I can see from the logs - this
> may be
> >> set by Flink if not provided explicitly). On 12th container restart,
> >> jobmanager gives up and the application stops. I'm not sure why it keeps
> >> reporting not enough slots after that point. In my experience this may
> >> happen when job eats up all the available slots, so that after container
> >> failure its tasks cannot be restarted in other (live) containers. But I
> >> believe once the decision to stop the application is made, there should
> not
> >> be any further attempts to restart the job, hence no logs like those.
> >> Hopefully, someone else will explain this to us :)
> >>
> >> In my case I made jobmanager restart containers infinitely by setting
> >> yarn.maximum-failed-containers = -1, so that taskmanager failure never
> >> results in application death. Note this is unlikely a good choice for a
> >> batch job.
> >>
> >> Regards,
> >> Yury
> >>
> >> 2017-01-05 3:21 GMT+03:00 Shannon Carey <sc...@expedia.com>:
> >>>
> >>> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem
> twice
> >>> and I'm wondering if anyone has some insight about it.
> >>>
> >>> In both cases, we deployed a job that fails very frequently (within
> >>> 15s-1m of launch). Eventually, the Flink cluster dies.
> >>>
> >>> The sequence of events looks something like this:
> >>>
> >>> bad job is launched
> >>> bad job fails & is restarted many times (I didn't have the
> "failure-rate"
> >>> restart strategy configuration right)
> >>> Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner
> (SIGTERM
> >>> handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
> >>> At this point, the YARN resource manager also logs the container
> failure
> >>> More logs: Container
> >>> ResourceID{resourceId='container_1481658997383_0003_01_000013'}
> failed. Exit
> >>> status: Pmem limit exceeded (-104)
> >>> Diagnostics for container
> >>> ResourceID{resourceId='container_1481658997383_0003_01_000013'} in
> state
> >>> COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container
> >>> [pid=21246,containerID=container_1481658997383_0003_01_000013] is
> running
> >>> beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB physical
> >>> memory used; 9.6 GB of 28.1 GB virtual memory used. Killing container.
> >>> Container killed on request. Exit code is 143
> >>> Container exited with a non-zero exit code 143
> >>> Total number of failed containers so far: 12
> >>> Stopping YARN session because the number of failed containers (12)
> >>> exceeded the maximum failed containers (11). This number is controlled
> by
> >>> the 'yarn.maximum-failed-containers' configuration setting. By
> default its
> >>> the number of requested containers.
> >>> From here onward, the logs repeatedly show that jobs fail to restart
> due
> >>> to
> >>> "org.apache.flink.runtime.jobmanager.scheduler.
> NoResourceAvailableException:
> >>> Not enough free slots available to run the job. You can decrease the
> >>> operator parallelism or increase the number of slots per TaskManager
> in the
> >>> configuration. Task to schedule: < Attempt #68 (Source: …) @
> (unassigned) -
> >>> [SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in
> sharing
> >>> group < SlotSharingGroup [73191c171abfff61fb5102c161274145,
> >>> 19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler:
> >>> Number of instances=0, total number of slots=0, available slots=0"
> >>> Eventually, Flink stops for some reason (with another SIGTERM message),
> >>> presumably because of YARN
> >>>
> >>> Does anyone have an idea why a bad job repeatedly failing would
> >>> eventually result in the Flink cluster dying?
> >>>
> >>> Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots
> >>> available to run the job"? The JVM heap usage and the free memory on
> the
> >>> machines both look reasonable in my monitoring dashboards. Could it
> possibly
> >>> be a memory leak due to classloading or something?
> >>>
> >>> Thanks for any help or suggestions you can provide! I am hoping that
> the
> >>> "failure-rate" restart strategy will help avoid this issue in the
> future,
> >>> but I'd also like to understand what's making the cluster die so that
> I can
> >>> prevent it.
> >>>
> >>> -Shannon
> >>
> >>
> >
>

Re: Rapidly failing job eventually causes "Not enough free slots"

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Shannon,

the final truth for recovery is in ZooKeeper. Can you check whether
there also references available in ZooKeeper? Do you have the job
manager logs available from after the failure? On recovery, Flink
checks ZooKeeper for entries. These point to files in the storageDir.
It could have happened that these got out of sync, e.g. entries
deleted from ZK but not from the storageDir.

Maybe the loss of the task managers can also be explained by a
connection loss to ZK or something. When a JM looses leadership, the
TMs cancel all tasks and disconnect from the JM. Here again, we would
need to look into the logs.

Sorry for the bad experience lately :-(

– Ufuk


On Sat, Jan 21, 2017 at 4:31 AM, Shannon Carey <sc...@expedia.com> wrote:
> In fact, I can see all my job jar blobs and some checkpoint & job graph
> files in my configured "recovery.zookeeper.storageDir"… however for some
> reason it didn't get restored when my new Flink cluster started up.
>
>
> From: Shannon Carey <sc...@expedia.com>
> Date: Friday, January 20, 2017 at 9:14 PM
> To: "user@flink.apache.org" <us...@flink.apache.org>
>
> Subject: Re: Rapidly failing job eventually causes "Not enough free slots"
>
> I recently added some better visibility into the metrics we're gathering
> from Flink. My Flink cluster died again due to the "Not enough free slots
> available to run the job" problem, and this time I can see that the number
> of registered task managers went down from 11 to 7, then waffled and only
> ever got back up to 10 (one short of the requested amount) before dropping
> to 0 just before the cluster died. This would seem to explain why there
> weren't sufficient slots (given that we were probably using them all or
> nearly all)… The metric of "running jobs" went down from 5 to 3 during this
> time period as well. So the problem seems to be loss of taskmanagers due to
> errors (not yet sure what exactly as I have to delve into logs).
>
> The other thing I have to figure out is restoring the jobs… I thought that
> HA would start the jobs back up again if Flink died & I re-launched it, but
> that doesn't appear to be the case.
>
>
> From: Stephan Ewen <se...@apache.org>
> Date: Thursday, January 5, 2017 at 7:52 AM
> To: <us...@flink.apache.org>
> Subject: Re: Rapidly failing job eventually causes "Not enough free slots"
>
> Another thought on the container failure:
>
> in 1.1, the user code is loaded dynamically whenever a Task is started. That
> means that on every task restart the code is reloaded. For that to work
> proper, class unloading needs to happen, or the permgen will eventually
> overflow.
>
> It can happen that class unloading is prevented if the user functions do
> leave references around as "GC roots", which may be threads, or references
> in registries, etc.
>
> In Flink 1.2, YARN will put the user code into the application classpath, so
> code needs not be reloaded on every restart. That should solve that issue.
> To "simulate" that behavior in Flink 1.1, put your application code jars
> into the "lib" folder
>
> Best,
> Stephan
>
>
> On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin <yu...@gmail.com> wrote:
>>
>> Hi,
>>
>> I've faced a similar issue recently. Hope sharing my findings will help.
>> The problem can be split into 2 parts:
>>
>> Source of container failures
>> The logs you provided indicate that YARN kills its containers for
>> exceeding memory limits. Important point here is that memory limit = JVM
>> heap memory + off-heap memory. So if off-heap memory usage is high, YARN may
>> kill containers despite JVM heap consumption is fine. To solve this issue,
>> Flink reserves a share of container memory for off-heap memory. How much
>> will be reserved is controlled by yarn.heap-cutoff-ratio and
>> yarn.heap-cutoff-min configuration. By default 25% of the requested
>> container memory will be reserved for off-heap. This is seems to be a good
>> start, but one should experiment and tune to meet their job specifics.
>>
>> It's also worthwhile to figure out who consumes off-heap memory. Is it
>> Flink managed memory moved off heap (taskmanager.memory.off-heap = true)? Is
>> it some external library allocating something off heap? Is it your own code?
>>
>> How Flink handles task manager failures
>> Whenever a task manager fails, the Flink jobmanager decides whether it
>> should:
>> - reallocate failed task manager container
>> - fail application entirely
>> These decisions can be guided by certain configuration
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn).
>> With default settings, job manager does reallocate task manager containers
>> up to the point when N failures have been observed, where N is the number of
>> requested task managers. After that the application is stopped.
>>
>> According to the logs, you have a finite number in
>> yarn.maximum-failed-containers (11, as I can see from the logs - this may be
>> set by Flink if not provided explicitly). On 12th container restart,
>> jobmanager gives up and the application stops. I'm not sure why it keeps
>> reporting not enough slots after that point. In my experience this may
>> happen when job eats up all the available slots, so that after container
>> failure its tasks cannot be restarted in other (live) containers. But I
>> believe once the decision to stop the application is made, there should not
>> be any further attempts to restart the job, hence no logs like those.
>> Hopefully, someone else will explain this to us :)
>>
>> In my case I made jobmanager restart containers infinitely by setting
>> yarn.maximum-failed-containers = -1, so that taskmanager failure never
>> results in application death. Note this is unlikely a good choice for a
>> batch job.
>>
>> Regards,
>> Yury
>>
>> 2017-01-05 3:21 GMT+03:00 Shannon Carey <sc...@expedia.com>:
>>>
>>> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice
>>> and I'm wondering if anyone has some insight about it.
>>>
>>> In both cases, we deployed a job that fails very frequently (within
>>> 15s-1m of launch). Eventually, the Flink cluster dies.
>>>
>>> The sequence of events looks something like this:
>>>
>>> bad job is launched
>>> bad job fails & is restarted many times (I didn't have the "failure-rate"
>>> restart strategy configuration right)
>>> Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner (SIGTERM
>>> handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>>> At this point, the YARN resource manager also logs the container failure
>>> More logs: Container
>>> ResourceID{resourceId='container_1481658997383_0003_01_000013'} failed. Exit
>>> status: Pmem limit exceeded (-104)
>>> Diagnostics for container
>>> ResourceID{resourceId='container_1481658997383_0003_01_000013'} in state
>>> COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container
>>> [pid=21246,containerID=container_1481658997383_0003_01_000013] is running
>>> beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB physical
>>> memory used; 9.6 GB of 28.1 GB virtual memory used. Killing container.
>>> Container killed on request. Exit code is 143
>>> Container exited with a non-zero exit code 143
>>> Total number of failed containers so far: 12
>>> Stopping YARN session because the number of failed containers (12)
>>> exceeded the maximum failed containers (11). This number is controlled by
>>> the 'yarn.maximum-failed-containers' configuration setting. By default its
>>> the number of requested containers.
>>> From here onward, the logs repeatedly show that jobs fail to restart due
>>> to
>>> "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Not enough free slots available to run the job. You can decrease the
>>> operator parallelism or increase the number of slots per TaskManager in the
>>> configuration. Task to schedule: < Attempt #68 (Source: …) @ (unassigned) -
>>> [SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in sharing
>>> group < SlotSharingGroup [73191c171abfff61fb5102c161274145,
>>> 19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler:
>>> Number of instances=0, total number of slots=0, available slots=0"
>>> Eventually, Flink stops for some reason (with another SIGTERM message),
>>> presumably because of YARN
>>>
>>> Does anyone have an idea why a bad job repeatedly failing would
>>> eventually result in the Flink cluster dying?
>>>
>>> Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots
>>> available to run the job"? The JVM heap usage and the free memory on the
>>> machines both look reasonable in my monitoring dashboards. Could it possibly
>>> be a memory leak due to classloading or something?
>>>
>>> Thanks for any help or suggestions you can provide! I am hoping that the
>>> "failure-rate" restart strategy will help avoid this issue in the future,
>>> but I'd also like to understand what's making the cluster die so that I can
>>> prevent it.
>>>
>>> -Shannon
>>
>>
>

Re: Rapidly failing job eventually causes "Not enough free slots"

Posted by Shannon Carey <sc...@expedia.com>.
In fact, I can see all my job jar blobs and some checkpoint & job graph files in my configured "recovery.zookeeper.storageDir"… however for some reason it didn't get restored when my new Flink cluster started up.


From: Shannon Carey <sc...@expedia.com>>
Date: Friday, January 20, 2017 at 9:14 PM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

I recently added some better visibility into the metrics we're gathering from Flink. My Flink cluster died again due to the "Not enough free slots available to run the job" problem, and this time I can see that the number of registered task managers went down from 11 to 7, then waffled and only ever got back up to 10 (one short of the requested amount) before dropping to 0 just before the cluster died. This would seem to explain why there weren't sufficient slots (given that we were probably using them all or nearly all)… The metric of "running jobs" went down from 5 to 3 during this time period as well. So the problem seems to be loss of taskmanagers due to errors (not yet sure what exactly as I have to delve into logs).

The other thing I have to figure out is restoring the jobs… I thought that HA would start the jobs back up again if Flink died & I re-launched it, but that doesn't appear to be the case.


From: Stephan Ewen <se...@apache.org>>
Date: Thursday, January 5, 2017 at 7:52 AM
To: <us...@flink.apache.org>>
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

Another thought on the container failure:

in 1.1, the user code is loaded dynamically whenever a Task is started. That means that on every task restart the code is reloaded. For that to work proper, class unloading needs to happen, or the permgen will eventually overflow.

It can happen that class unloading is prevented if the user functions do leave references around as "GC roots", which may be threads, or references in registries, etc.

In Flink 1.2, YARN will put the user code into the application classpath, so code needs not be reloaded on every restart. That should solve that issue.
To "simulate" that behavior in Flink 1.1, put your application code jars into the "lib" folder

Best,
Stephan


On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin <yu...@gmail.com>> wrote:
Hi,

I've faced a similar issue recently. Hope sharing my findings will help. The problem can be split into 2 parts:

Source of container failures
The logs you provided indicate that YARN kills its containers for exceeding memory limits. Important point here is that memory limit = JVM heap memory + off-heap memory. So if off-heap memory usage is high, YARN may kill containers despite JVM heap consumption is fine. To solve this issue, Flink reserves a share of container memory for off-heap memory. How much will be reserved is controlled by yarn.heap-cutoff-ratio and yarn.heap-cutoff-min configuration. By default 25% of the requested container memory will be reserved for off-heap. This is seems to be a good start, but one should experiment and tune to meet their job specifics.

It's also worthwhile to figure out who consumes off-heap memory. Is it Flink managed memory moved off heap (taskmanager.memory.off-heap = true)? Is it some external library allocating something off heap? Is it your own code?

How Flink handles task manager failures
Whenever a task manager fails, the Flink jobmanager decides whether it should:
- reallocate failed task manager container
- fail application entirely
These decisions can be guided by certain configuration (https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn). With default settings, job manager does reallocate task manager containers up to the point when N failures have been observed, where N is the number of requested task managers. After that the application is stopped.

According to the logs, you have a finite number in yarn.maximum-failed-containers (11, as I can see from the logs - this may be set by Flink if not provided explicitly). On 12th container restart, jobmanager gives up and the application stops. I'm not sure why it keeps reporting not enough slots after that point. In my experience this may happen when job eats up all the available slots, so that after container failure its tasks cannot be restarted in other (live) containers. But I believe once the decision to stop the application is made, there should not be any further attempts to restart the job, hence no logs like those. Hopefully, someone else will explain this to us :)

In my case I made jobmanager restart containers infinitely by setting yarn.maximum-failed-containers = -1, so that taskmanager failure never results in application death. Note this is unlikely a good choice for a batch job.

Regards,
Yury

2017-01-05 3:21 GMT+03:00 Shannon Carey <sc...@expedia.com>>:
In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice and I'm wondering if anyone has some insight about it.

In both cases, we deployed a job that fails very frequently (within 15s-1m of launch). Eventually, the Flink cluster dies.

The sequence of events looks something like this:

  *   bad job is launched
  *   bad job fails & is restarted many times (I didn't have the "failure-rate" restart strategy configuration right)
  *   Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner (SIGTERM handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
  *   At this point, the YARN resource manager also logs the container failure
  *   More logs: Container ResourceID{resourceId='container_1481658997383_0003_01_000013'} failed. Exit status: Pmem limit exceeded (-104)
  *
Diagnostics for container ResourceID{resourceId='container_1481658997383_0003_01_000013'} in state COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container [pid=21246,containerID=container_1481658997383_0003_01_000013] is running beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB physical memory used; 9.6 GB of 28.1 GB virtual memory used. Killing container.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Total number of failed containers so far: 12
Stopping YARN session because the number of failed containers (12) exceeded the maximum failed containers (11). This number is controlled by the 'yarn.maximum-failed-containers' configuration setting. By default its the number of requested containers.
  *   From here onward, the logs repeatedly show that jobs fail to restart due to "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #68 (Source: …) @ (unassigned) - [SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in sharing group < SlotSharingGroup [73191c171abfff61fb5102c161274145, 19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler: Number of instances=0, total number of slots=0, available slots=0"
  *   Eventually, Flink stops for some reason (with another SIGTERM message), presumably because of YARN

Does anyone have an idea why a bad job repeatedly failing would eventually result in the Flink cluster dying?

Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots available to run the job"? The JVM heap usage and the free memory on the machines both look reasonable in my monitoring dashboards. Could it possibly be a memory leak due to classloading or something?

Thanks for any help or suggestions you can provide! I am hoping that the "failure-rate" restart strategy will help avoid this issue in the future, but I'd also like to understand what's making the cluster die so that I can prevent it.

-Shannon



Re: Rapidly failing job eventually causes "Not enough free slots"

Posted by Shannon Carey <sc...@expedia.com>.
I recently added some better visibility into the metrics we're gathering from Flink. My Flink cluster died again due to the "Not enough free slots available to run the job" problem, and this time I can see that the number of registered task managers went down from 11 to 7, then waffled and only ever got back up to 10 (one short of the requested amount) before dropping to 0 just before the cluster died. This would seem to explain why there weren't sufficient slots (given that we were probably using them all or nearly all)… The metric of "running jobs" went down from 5 to 3 during this time period as well. So the problem seems to be loss of taskmanagers due to errors (not yet sure what exactly as I have to delve into logs).

The other thing I have to figure out is restoring the jobs… I thought that HA would start the jobs back up again if Flink died & I re-launched it, but that doesn't appear to be the case.


From: Stephan Ewen <se...@apache.org>>
Date: Thursday, January 5, 2017 at 7:52 AM
To: <us...@flink.apache.org>>
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

Another thought on the container failure:

in 1.1, the user code is loaded dynamically whenever a Task is started. That means that on every task restart the code is reloaded. For that to work proper, class unloading needs to happen, or the permgen will eventually overflow.

It can happen that class unloading is prevented if the user functions do leave references around as "GC roots", which may be threads, or references in registries, etc.

In Flink 1.2, YARN will put the user code into the application classpath, so code needs not be reloaded on every restart. That should solve that issue.
To "simulate" that behavior in Flink 1.1, put your application code jars into the "lib" folder

Best,
Stephan


On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin <yu...@gmail.com>> wrote:
Hi,

I've faced a similar issue recently. Hope sharing my findings will help. The problem can be split into 2 parts:

Source of container failures
The logs you provided indicate that YARN kills its containers for exceeding memory limits. Important point here is that memory limit = JVM heap memory + off-heap memory. So if off-heap memory usage is high, YARN may kill containers despite JVM heap consumption is fine. To solve this issue, Flink reserves a share of container memory for off-heap memory. How much will be reserved is controlled by yarn.heap-cutoff-ratio and yarn.heap-cutoff-min configuration. By default 25% of the requested container memory will be reserved for off-heap. This is seems to be a good start, but one should experiment and tune to meet their job specifics.

It's also worthwhile to figure out who consumes off-heap memory. Is it Flink managed memory moved off heap (taskmanager.memory.off-heap = true)? Is it some external library allocating something off heap? Is it your own code?

How Flink handles task manager failures
Whenever a task manager fails, the Flink jobmanager decides whether it should:
- reallocate failed task manager container
- fail application entirely
These decisions can be guided by certain configuration (https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn). With default settings, job manager does reallocate task manager containers up to the point when N failures have been observed, where N is the number of requested task managers. After that the application is stopped.

According to the logs, you have a finite number in yarn.maximum-failed-containers (11, as I can see from the logs - this may be set by Flink if not provided explicitly). On 12th container restart, jobmanager gives up and the application stops. I'm not sure why it keeps reporting not enough slots after that point. In my experience this may happen when job eats up all the available slots, so that after container failure its tasks cannot be restarted in other (live) containers. But I believe once the decision to stop the application is made, there should not be any further attempts to restart the job, hence no logs like those. Hopefully, someone else will explain this to us :)

In my case I made jobmanager restart containers infinitely by setting yarn.maximum-failed-containers = -1, so that taskmanager failure never results in application death. Note this is unlikely a good choice for a batch job.

Regards,
Yury

2017-01-05 3:21 GMT+03:00 Shannon Carey <sc...@expedia.com>>:
In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice and I'm wondering if anyone has some insight about it.

In both cases, we deployed a job that fails very frequently (within 15s-1m of launch). Eventually, the Flink cluster dies.

The sequence of events looks something like this:

  *   bad job is launched
  *   bad job fails & is restarted many times (I didn't have the "failure-rate" restart strategy configuration right)
  *   Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner (SIGTERM handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
  *   At this point, the YARN resource manager also logs the container failure
  *   More logs: Container ResourceID{resourceId='container_1481658997383_0003_01_000013'} failed. Exit status: Pmem limit exceeded (-104)
  *
Diagnostics for container ResourceID{resourceId='container_1481658997383_0003_01_000013'} in state COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container [pid=21246,containerID=container_1481658997383_0003_01_000013] is running beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB physical memory used; 9.6 GB of 28.1 GB virtual memory used. Killing container.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Total number of failed containers so far: 12
Stopping YARN session because the number of failed containers (12) exceeded the maximum failed containers (11). This number is controlled by the 'yarn.maximum-failed-containers' configuration setting. By default its the number of requested containers.
  *   From here onward, the logs repeatedly show that jobs fail to restart due to "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #68 (Source: …) @ (unassigned) - [SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in sharing group < SlotSharingGroup [73191c171abfff61fb5102c161274145, 19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler: Number of instances=0, total number of slots=0, available slots=0"
  *   Eventually, Flink stops for some reason (with another SIGTERM message), presumably because of YARN

Does anyone have an idea why a bad job repeatedly failing would eventually result in the Flink cluster dying?

Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots available to run the job"? The JVM heap usage and the free memory on the machines both look reasonable in my monitoring dashboards. Could it possibly be a memory leak due to classloading or something?

Thanks for any help or suggestions you can provide! I am hoping that the "failure-rate" restart strategy will help avoid this issue in the future, but I'd also like to understand what's making the cluster die so that I can prevent it.

-Shannon



Re: Rapidly failing job eventually causes "Not enough free slots"

Posted by Stephan Ewen <se...@apache.org>.
Another thought on the container failure:

in 1.1, the user code is loaded dynamically whenever a Task is started.
That means that on every task restart the code is reloaded. For that to
work proper, class unloading needs to happen, or the permgen will
eventually overflow.

It can happen that class unloading is prevented if the user functions do
leave references around as "GC roots", which may be threads, or references
in registries, etc.

In Flink 1.2, YARN will put the user code into the application classpath,
so code needs not be reloaded on every restart. That should solve that
issue.
To "simulate" that behavior in Flink 1.1, put your application code jars
into the "lib" folder

Best,
Stephan


On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin <yu...@gmail.com> wrote:

> Hi,
>
> I've faced a similar issue recently. Hope sharing my findings will help.
> The problem can be split into 2 parts:
>
> *Source of container failures*
> The logs you provided indicate that YARN kills its containers for
> exceeding memory limits. Important point here is that memory limit = JVM
> heap memory + off-heap memory. So if off-heap memory usage is high, YARN
> may kill containers despite JVM heap consumption is fine. To solve this
> issue, Flink reserves a share of container memory for off-heap memory. How
> much will be reserved is controlled by yarn.heap-cutoff-ratio and
> yarn.heap-cutoff-min configuration. By default 25% of the requested
> container memory will be reserved for off-heap. This is seems to be a good
> start, but one should experiment and tune to meet their job specifics.
>
> It's also worthwhile to figure out who consumes off-heap memory. Is it
> Flink managed memory moved off heap (taskmanager.memory.off-heap = true)?
> Is it some external library allocating something off heap? Is it your own
> code?
>
> *How Flink handles task manager failures*
> Whenever a task manager fails, the Flink jobmanager decides whether it
> should:
> - reallocate failed task manager container
> - fail application entirely
> These decisions can be guided by certain configuration (
> https://ci.apache.org/projects/flink/flink-docs-release-1.
> 1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn). With default
> settings, job manager does reallocate task manager containers up to the
> point when N failures have been observed, where N is the number of
> requested task managers. After that the application is stopped.
>
> According to the logs, you have a finite number in yarn.maximum-failed-
> containers (11, as I can see from the logs - this may be set by Flink if
> not provided explicitly). On 12th container restart, jobmanager gives up
> and the application stops. I'm not sure why it keeps reporting not enough
> slots after that point. In my experience this may happen when job eats up
> all the available slots, so that after container failure its tasks cannot
> be restarted in other (live) containers. But I believe once the decision to
> stop the application is made, there should not be any further attempts to
> restart the job, hence no logs like those. Hopefully, someone else will
> explain this to us :)
>
> In my case I made jobmanager restart containers infinitely by setting yarn.maximum-failed-containers
> = -1, so that taskmanager failure never results in application
> death. Note this is unlikely a good choice for a batch job.
>
> Regards,
> Yury
>
> 2017-01-05 3:21 GMT+03:00 Shannon Carey <sc...@expedia.com>:
>
>> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice
>> and I'm wondering if anyone has some insight about it.
>>
>> In both cases, we deployed a job that fails very frequently (within
>> 15s-1m of launch). Eventually, the Flink cluster dies.
>>
>> The sequence of events looks something like this:
>>
>>    - bad job is launched
>>    - bad job fails & is restarted many times (I didn't have the
>>    "failure-rate" restart strategy configuration right)
>>    - Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner
>>    (SIGTERM handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>>    - At this point, the YARN resource manager also logs the container
>>    failure
>>    - More logs: Container ResourceID{resourceId='contain
>>    er_1481658997383_0003_01_000013'} failed. Exit status: Pmem limit
>>    exceeded (-104)
>>    - Diagnostics for container ResourceID{resourceId='contain
>>    er_1481658997383_0003_01_000013'} in state COMPLETE : exitStatus=Pmem
>>    limit exceeded (-104) diagnostics=Container [pid=21246,containerID=contain
>>    er_1481658997383_0003_01_000013] is running beyond physical memory
>>    limits. Current usage: 5.6 GB of 5.6 GB physical memory used; 9.6 GB of
>>    28.1 GB virtual memory used. Killing container.
>>    Container killed on request. Exit code is 143
>>    Container exited with a non-zero exit code 143
>>    Total number of failed containers so far: 12
>>    Stopping YARN session because the number of failed containers (12)
>>    exceeded the maximum failed containers (11). This number is controlled by
>>    the 'yarn.maximum-failed-containers' configuration setting. By
>>    default its the number of requested containers.
>>    - From here onward, the logs repeatedly show that jobs fail to
>>    restart due to "org.apache.flink.runtime.jobm
>>    anager.scheduler.NoResourceAvailableException: Not enough free slots
>>    available to run the job. You can decrease the operator parallelism or
>>    increase the number of slots per TaskManager in the configuration. Task to
>>    schedule: < Attempt #68 (Source: …) @ (unassigned) - [SCHEDULED] > with
>>    groupID < 73191c171abfff61fb5102c161274145 > in sharing group <
>>    SlotSharingGroup [73191c171abfff61fb5102c161274145,
>>    19596f7834805c8409c419f0edab1f1b] >. Resources available to
>>    scheduler: Number of instances=0, total number of slots=0, available
>>    slots=0"
>>    - Eventually, Flink stops for some reason (with another SIGTERM
>>    message), presumably because of YARN
>>
>> Does anyone have an idea why a bad job repeatedly failing would
>> eventually result in the Flink cluster dying?
>>
>> Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots
>> available to run the job"? The JVM heap usage and the free memory on the
>> machines both look reasonable in my monitoring dashboards. Could it
>> possibly be a memory leak due to classloading or something?
>>
>> Thanks for any help or suggestions you can provide! I am hoping that the
>> "failure-rate" restart strategy will help avoid this issue in the future,
>> but I'd also like to understand what's making the cluster die so that I can
>> prevent it.
>>
>> -Shannon
>>
>
>

Re: Rapidly failing job eventually causes "Not enough free slots"

Posted by Yury Ruchin <yu...@gmail.com>.
Hi,

I've faced a similar issue recently. Hope sharing my findings will help.
The problem can be split into 2 parts:

*Source of container failures*
The logs you provided indicate that YARN kills its containers for exceeding
memory limits. Important point here is that memory limit = JVM heap memory
+ off-heap memory. So if off-heap memory usage is high, YARN may kill
containers despite JVM heap consumption is fine. To solve this issue, Flink
reserves a share of container memory for off-heap memory. How much will be
reserved is controlled by yarn.heap-cutoff-ratio and
yarn.heap-cutoff-min configuration.
By default 25% of the requested container memory will be reserved for
off-heap. This is seems to be a good start, but one should experiment and
tune to meet their job specifics.

It's also worthwhile to figure out who consumes off-heap memory. Is it
Flink managed memory moved off heap (taskmanager.memory.off-heap = true)?
Is it some external library allocating something off heap? Is it your own
code?

*How Flink handles task manager failures*
Whenever a task manager fails, the Flink jobmanager decides whether it
should:
- reallocate failed task manager container
- fail application entirely
These decisions can be guided by certain configuration (
https://ci.apache.org/projects/flink/flink-docs-
release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn). With
default settings, job manager does reallocate task manager containers up to
the point when N failures have been observed, where N is the number of
requested task managers. After that the application is stopped.

According to the logs, you have a finite number in
yarn.maximum-failed-containers (11, as I can see from the logs - this may
be set by Flink if not provided explicitly). On 12th container restart,
jobmanager gives up and the application stops. I'm not sure why it keeps
reporting not enough slots after that point. In my experience this may
happen when job eats up all the available slots, so that after container
failure its tasks cannot be restarted in other (live) containers. But I
believe once the decision to stop the application is made, there should not
be any further attempts to restart the job, hence no logs like those.
Hopefully, someone else will explain this to us :)

In my case I made jobmanager restart containers infinitely by setting
yarn.maximum-failed-containers
= -1, so that taskmanager failure never results in application death. Note
this is unlikely a good choice for a batch job.

Regards,
Yury

2017-01-05 3:21 GMT+03:00 Shannon Carey <sc...@expedia.com>:

> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice
> and I'm wondering if anyone has some insight about it.
>
> In both cases, we deployed a job that fails very frequently (within 15s-1m
> of launch). Eventually, the Flink cluster dies.
>
> The sequence of events looks something like this:
>
>    - bad job is launched
>    - bad job fails & is restarted many times (I didn't have the
>    "failure-rate" restart strategy configuration right)
>    - Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner
>    (SIGTERM handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>    - At this point, the YARN resource manager also logs the container
>    failure
>    - More logs: Container ResourceID{resourceId='
>    container_1481658997383_0003_01_000013'} failed. Exit status: Pmem
>    limit exceeded (-104)
>    - Diagnostics for container ResourceID{resourceId='
>    container_1481658997383_0003_01_000013'} in state COMPLETE :
>    exitStatus=Pmem limit exceeded (-104) diagnostics=Container
>    [pid=21246,containerID=container_1481658997383_0003_01_000013] is
>    running beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB
>    physical memory used; 9.6 GB of 28.1 GB virtual memory used. Killing
>    container.
>    Container killed on request. Exit code is 143
>    Container exited with a non-zero exit code 143
>    Total number of failed containers so far: 12
>    Stopping YARN session because the number of failed containers (12)
>    exceeded the maximum failed containers (11). This number is controlled by
>    the 'yarn.maximum-failed-containers' configuration setting. By default
>    its the number of requested containers.
>    - From here onward, the logs repeatedly show that jobs fail to restart
>    due to "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>    Not enough free slots available to run the job. You can decrease the
>    operator parallelism or increase the number of slots per TaskManager in the
>    configuration. Task to schedule: < Attempt #68 (Source: …) @ (unassigned) -
>    [SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in
>    sharing group < SlotSharingGroup [73191c171abfff61fb5102c161274145,
>    19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler:
>    Number of instances=0, total number of slots=0, available slots=0"
>    - Eventually, Flink stops for some reason (with another SIGTERM
>    message), presumably because of YARN
>
> Does anyone have an idea why a bad job repeatedly failing would eventually
> result in the Flink cluster dying?
>
> Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots
> available to run the job"? The JVM heap usage and the free memory on the
> machines both look reasonable in my monitoring dashboards. Could it
> possibly be a memory leak due to classloading or something?
>
> Thanks for any help or suggestions you can provide! I am hoping that the
> "failure-rate" restart strategy will help avoid this issue in the future,
> but I'd also like to understand what's making the cluster die so that I can
> prevent it.
>
> -Shannon
>