You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shubham Kumar <sh...@gmail.com> on 2020/09/19 16:06:40 UTC

Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

Hey everyone,

We had deployed a streaming job using Flink 1.10.1 one month back and now
we are encountering a Yarn container killed due to memory issues very
frequently. I am trying to figure out the root cause of this issue in order
to fix it.

We have a streaming job whose basic structure looks like this:
- Read 6 kafka streams and combine stats from them (union) to form a single
stream
- stream.keyBy(MyKey)
             .window(TumblingEventTimeWindows.of(Time.minutes(1)))
             .reduce(MyReduceFunction)
             .addSink(new FlinkKafkaProducer011<>...);

We are using RocksDB as state backend. In flink-conf.yaml, we used
taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
one slot per task manager.

So, a taskmanager process gets started with the following memory components
as indicated in logs:

TaskExecutor container... will be started on ... with
> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>

>

 which are as per defaults.

Now, after 25 days we started encountering the following yarn container
kill error:

> Association with remote system [akka.tcp://flink@...] has failed, address
> is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@...]] Caused by: [java.net.ConnectException: Connection
> refused: .../...:37679]
> 2020-09-09 00:53:24 INFO Closing TaskExecutor connection
> container_e193_1592804717489_149347_01_000011 because: [2020-09-09 00:53:
> 21.417]Container [pid=44371,containerID=container_e193_1592804717489_149347_01_000011]
> is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB
> physical memory used; 14.4 GB of 25.2 GB virtual memory used. Killing
> container.
>

Yarn container size is 12GB as it is only allowed as a multiple of 3 GB (as
per our settings).

Now, when the YARN reallocates a new container, the program starts again
(without any issues) and after a few hours another container is killed with
the same error and the cycle repeats.
At this point, I want to debug it as a running process without changing or
playing around with various config options for memory as I don't think just
to reproduce the error, I want to wait for ~1 month.

I have tried to figure out something from Graphite metrics (see
attachments):
[1]: JVM Heap Memory (First 25 days) -> The memory goes up and after
reaching a point goes does and again starts going up. (No container kills
were encountered until 09/09/2020, program started on 14/08/2020)
[2]: JVM Heap Memory (Recent) -> The memory is still going up but it seems
it doesn't even reaches its peak, but instead container is killed before
that itself (within a few hours)

From [1] and [2], JVM heap memory should not rise up I think, but that
doesn't explain container kill in [2] case if JVM heap memory was the issue
causing container kill.

[3]: Direct Memory and Off heap Memory -> I don't think this is causing the
issue as most of the network buffers are free and off heap memory is well
below threshold.

At this point I thought RocksDB might be the culprit. I am aware that it
uses the managed memory limits (I haven't changed any default config) which
is completely off heap. But when I see the rocksDB size maintained at
location:

/data_4/yarn-nm-local-dir/usercache/root/appcache/application_.../flink-io-a48d1127-58a1-41c5-a5f0-32c5180fe74d/job_0bff1881431b5774c3b496a98febed1a_op_WindowOperator_4061fbe16fb95459a1a8d207644e2e63__4_12__uuid_9fe0b2ff-24bc-4301-8044-3fe8e1b3a3a0/db/


It is only 17MB which doesn't seem much. I also took a heap dump
of org.apache.flink.yarn.YarnTaskExecutorRunner process but it shows only
30MB of data is being used (not sure what I am missing here as it doesn't
match with metrics shown by flink).

Although top -p 'pid' (for task manager process) does show RES = 10-12 GB
for every container constantly going up and eventually dies.

Has someone encountered a similar situation or have guidelines that I can
continue with to figure out and debug the issue? Let me know if there is
anything else that you might wanna know.

-- 
Thanks & Regards

Shubham Kumar

Re: [External Sender] Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

Posted by Shubham Kumar <sh...@gmail.com>.
@Kye , Thanks for your suggestions, we are using one yarn app per job mode
and your point is still valid in Flink 1.10 as per docs, it does make sense
to avoid dynamic classloading for such jobs. Also, we seemed to have enough
off heap for resources mentioned and what turned out to be the issue was
RocksDB memory usage (check below).

@Xintong, Yeah, I did try out the solution, the problem is definitely due
to RocksDB, however the problem got solved by something else:

Short answer:
Setting this property in flink-conf.yaml solves the issue:

> state.backend.rocksdb.managed.memory : false


Long answer:

I observed that the OOM kills are a function of the number of restarts
rather than the time for which the application is running. For every
restart, the Taskmanager's RES memory rises by 3.5GB (which is the Flink
managed memory allotted to TM). So, it could only withstand 2-3 restarts
after which OOM kills become frequent as now the other TM will start
getting killed. I enabled RocksDB  block-cache usage metric and it rises up
until it reaches ~3.5 GB.

At this point I tried setting

> containerized.taskmanager.env.MALLOC_ARENA_MAX : 2


This did seem to reduce memory increase for few of the task managers(for
e.g. if there are 12 task managers,  after a restart the RES memory
increases by 3.5 GB for only few of them but not for others), but didn't
solve the issue for me and OOM kills begin to occur after 4-5 restarts. I
also tried setting it to 1, but got similar results. I didn't try using
jemalloc because as per the JIRA issue [1], MALLOC_ARENA_MAX solution
intends to produce similar results.

After setting state.backend.rocksdb.managed.memory: false, the TM RES
memory doesn't increase after any number of restarts, infact after enabling
RocksDB cache usage metrics, it shows around only ~100MB usage (ofcourse
its dependent on the operators and state involved in the job). This might
indicate that Flink is trying to allot more memory than required for
RocksDB and also upon restart the RES memory rises again which is
definitely not the intended behavior.

[1]: https://issues.apache.org/jira/browse/FLINK-18712

Thanks
Shubham


On Fri, Sep 25, 2020 at 8:46 PM Kye Bae <ky...@capitalone.com> wrote:

> Not sure about Flink 1.10.x. Can share a few things up to Flink 1.9.x:
>
> 1. If your Flink cluster runs only one job, avoid using dynamic
> classloader for your job: start it from one of the Flink class paths. As of
> Flink 1.9.x, using the dynamic classloader results in the same classes
> getting loaded every time the job restarts (self-recovery or otherwise),
> and it could eat up all the JVM "off-heap" memory. Yarn seems to
> immediately kill the container when that happens.
>
> 2. Be sure to leave enough for the JVM "off-heap" area: GC + code cache +
> thread stacks + other Java internal resources end up there.
>
> -K
>
> On Sat, Sep 19, 2020 at 12:09 PM Shubham Kumar <sh...@gmail.com>
> wrote:
>
>> Hey everyone,
>>
>> We had deployed a streaming job using Flink 1.10.1 one month back and now
>> we are encountering a Yarn container killed due to memory issues very
>> frequently. I am trying to figure out the root cause of this issue in order
>> to fix it.
>>
>> We have a streaming job whose basic structure looks like this:
>> - Read 6 kafka streams and combine stats from them (union) to form a
>> single stream
>> - stream.keyBy(MyKey)
>>              .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>>              .reduce(MyReduceFunction)
>>              .addSink(new FlinkKafkaProducer011<>...);
>>
>> We are using RocksDB as state backend. In flink-conf.yaml, we used
>> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
>> one slot per task manager.
>>
>> So, a taskmanager process gets started with the following memory
>> components as indicated in logs:
>>
>> TaskExecutor container... will be started on ... with
>>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>>
>>
>>>
>>
>>  which are as per defaults.
>>
>> Now, after 25 days we started encountering the following yarn container
>> kill error:
>>
>>> Association with remote system [akka.tcp://flink@...] has failed,
>>> address is now gated for [50] ms. Reason: [Association failed with
>>> [akka.tcp://flink@...]] Caused by: [java.net
>>> <https://urldefense.com/v3/__http://java.net/__;!!EFVe01R3CjU!Ip3Kjw-Zxo6kwXvUVlcRfPhaO5QmiI_MdJ5oBurn-evN9B1gr3XT66Tq95hdv3Dfaw$>
>>> .ConnectException: Connection refused: .../...:37679]
>>> 2020-09-09 00:53:24 INFO Closing TaskExecutor connection
>>> container_e193_1592804717489_149347_01_000011 because: [2020-09-09 00:53
>>> :21.417]Container [pid=44371,containerID=container_e193_1592804717489_149347_01_000011]
>>> is running beyond physical memory limits. Current usage: 12.0 GB of 12
>>> GB physical memory used; 14.4 GB of 25.2 GB virtual memory used. Killing
>>> container.
>>>
>>
>> Yarn container size is 12GB as it is only allowed as a multiple of 3 GB
>> (as per our settings).
>>
>> Now, when the YARN reallocates a new container, the program starts again
>> (without any issues) and after a few hours another container is killed with
>> the same error and the cycle repeats.
>> At this point, I want to debug it as a running process without changing
>> or playing around with various config options for memory as I don't think
>> just to reproduce the error, I want to wait for ~1 month.
>>
>> I have tried to figure out something from Graphite metrics (see
>> attachments):
>> [1]: JVM Heap Memory (First 25 days) -> The memory goes up and after
>> reaching a point goes does and again starts going up. (No container kills
>> were encountered until 09/09/2020, program started on 14/08/2020)
>> [2]: JVM Heap Memory (Recent) -> The memory is still going up but it
>> seems it doesn't even reaches its peak, but instead container is killed
>> before that itself (within a few hours)
>>
>> From [1] and [2], JVM heap memory should not rise up I think, but that
>> doesn't explain container kill in [2] case if JVM heap memory was the issue
>> causing container kill.
>>
>> [3]: Direct Memory and Off heap Memory -> I don't think this is causing
>> the issue as most of the network buffers are free and off heap memory is
>> well below threshold.
>>
>> At this point I thought RocksDB might be the culprit. I am aware that it
>> uses the managed memory limits (I haven't changed any default config) which
>> is completely off heap. But when I see the rocksDB size maintained at
>> location:
>>
>>
>>> /data_4/yarn-nm-local-dir/usercache/root/appcache/application_.../flink-io-a48d1127-58a1-41c5-a5f0-32c5180fe74d/job_0bff1881431b5774c3b496a98febed1a_op_WindowOperator_4061fbe16fb95459a1a8d207644e2e63__4_12__uuid_9fe0b2ff-24bc-4301-8044-3fe8e1b3a3a0/db/
>>
>>
>> It is only 17MB which doesn't seem much. I also took a heap dump
>> of org.apache.flink.yarn.YarnTaskExecutorRunner process but it shows only
>> 30MB of data is being used (not sure what I am missing here as it doesn't
>> match with metrics shown by flink).
>>
>> Although top -p 'pid' (for task manager process) does show RES = 10-12 GB
>> for every container constantly going up and eventually dies.
>>
>> Has someone encountered a similar situation or have guidelines that I can
>> continue with to figure out and debug the issue? Let me know if there is
>> anything else that you might wanna know.
>>
>> --
>> Thanks & Regards
>>
>> Shubham Kumar
>>
>> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>

-- 
Thanks & Regards

Shubham Kumar

Re: [External Sender] Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

Posted by Kye Bae <ky...@capitalone.com>.
Not sure about Flink 1.10.x. Can share a few things up to Flink 1.9.x:

1. If your Flink cluster runs only one job, avoid using dynamic classloader
for your job: start it from one of the Flink class paths. As of Flink
1.9.x, using the dynamic classloader results in the same classes getting
loaded every time the job restarts (self-recovery or otherwise), and it
could eat up all the JVM "off-heap" memory. Yarn seems to immediately kill
the container when that happens.

2. Be sure to leave enough for the JVM "off-heap" area: GC + code cache +
thread stacks + other Java internal resources end up there.

-K

On Sat, Sep 19, 2020 at 12:09 PM Shubham Kumar <sh...@gmail.com>
wrote:

> Hey everyone,
>
> We had deployed a streaming job using Flink 1.10.1 one month back and now
> we are encountering a Yarn container killed due to memory issues very
> frequently. I am trying to figure out the root cause of this issue in order
> to fix it.
>
> We have a streaming job whose basic structure looks like this:
> - Read 6 kafka streams and combine stats from them (union) to form a
> single stream
> - stream.keyBy(MyKey)
>              .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>              .reduce(MyReduceFunction)
>              .addSink(new FlinkKafkaProducer011<>...);
>
> We are using RocksDB as state backend. In flink-conf.yaml, we used
> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
> one slot per task manager.
>
> So, a taskmanager process gets started with the following memory
> components as indicated in logs:
>
> TaskExecutor container... will be started on ... with
>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>
>
>>
>
>  which are as per defaults.
>
> Now, after 25 days we started encountering the following yarn container
> kill error:
>
>> Association with remote system [akka.tcp://flink@...] has failed,
>> address is now gated for [50] ms. Reason: [Association failed with
>> [akka.tcp://flink@...]] Caused by: [java.net
>> <https://urldefense.com/v3/__http://java.net/__;!!EFVe01R3CjU!Ip3Kjw-Zxo6kwXvUVlcRfPhaO5QmiI_MdJ5oBurn-evN9B1gr3XT66Tq95hdv3Dfaw$>
>> .ConnectException: Connection refused: .../...:37679]
>> 2020-09-09 00:53:24 INFO Closing TaskExecutor connection
>> container_e193_1592804717489_149347_01_000011 because: [2020-09-09 00:53:
>> 21.417]Container [pid=44371,containerID=container_e193_1592804717489_149347_01_000011]
>> is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB
>> physical memory used; 14.4 GB of 25.2 GB virtual memory used. Killing
>> container.
>>
>
> Yarn container size is 12GB as it is only allowed as a multiple of 3 GB
> (as per our settings).
>
> Now, when the YARN reallocates a new container, the program starts again
> (without any issues) and after a few hours another container is killed with
> the same error and the cycle repeats.
> At this point, I want to debug it as a running process without changing or
> playing around with various config options for memory as I don't think just
> to reproduce the error, I want to wait for ~1 month.
>
> I have tried to figure out something from Graphite metrics (see
> attachments):
> [1]: JVM Heap Memory (First 25 days) -> The memory goes up and after
> reaching a point goes does and again starts going up. (No container kills
> were encountered until 09/09/2020, program started on 14/08/2020)
> [2]: JVM Heap Memory (Recent) -> The memory is still going up but it seems
> it doesn't even reaches its peak, but instead container is killed before
> that itself (within a few hours)
>
> From [1] and [2], JVM heap memory should not rise up I think, but that
> doesn't explain container kill in [2] case if JVM heap memory was the issue
> causing container kill.
>
> [3]: Direct Memory and Off heap Memory -> I don't think this is causing
> the issue as most of the network buffers are free and off heap memory is
> well below threshold.
>
> At this point I thought RocksDB might be the culprit. I am aware that it
> uses the managed memory limits (I haven't changed any default config) which
> is completely off heap. But when I see the rocksDB size maintained at
> location:
>
>
>> /data_4/yarn-nm-local-dir/usercache/root/appcache/application_.../flink-io-a48d1127-58a1-41c5-a5f0-32c5180fe74d/job_0bff1881431b5774c3b496a98febed1a_op_WindowOperator_4061fbe16fb95459a1a8d207644e2e63__4_12__uuid_9fe0b2ff-24bc-4301-8044-3fe8e1b3a3a0/db/
>
>
> It is only 17MB which doesn't seem much. I also took a heap dump
> of org.apache.flink.yarn.YarnTaskExecutorRunner process but it shows only
> 30MB of data is being used (not sure what I am missing here as it doesn't
> match with metrics shown by flink).
>
> Although top -p 'pid' (for task manager process) does show RES = 10-12 GB
> for every container constantly going up and eventually dies.
>
> Has someone encountered a similar situation or have guidelines that I can
> continue with to figure out and debug the issue? Let me know if there is
> anything else that you might wanna know.
>
> --
> Thanks & Regards
>
> Shubham Kumar
>
>

______________________________________________________________________



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.




Re: Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

Posted by Xintong Song <to...@gmail.com>.
Hi Shubham,

Concerning FLINK-18712, thanks for the pointer. I was not aware of this
issue before. Running on Kubernetes or Yarn should not affect this issue. I
cannot tell whether this issue is the cause of your problem. The simplest
way to confirm this is probably just try the solution to see if that fixes
your problem.

Given that it could take weeks to reproduce your problem, I would suggest
to keep track of the native memory usage with jemalloc and jeprof. This
should provide direct information about which component is using extra
memory.

Thank you~

Xintong Song



On Tue, Sep 22, 2020 at 10:42 PM Shubham Kumar <sh...@gmail.com>
wrote:

> Hi Xintong,
>
> Thanks for your insights, they are really helpful.
>
> I understand now that it most certainly is a native memory issue rather
> than a heap memory issue and about not trusting Flink's Non-Heap metrics.
>
> I do believe that our structure of job is so simple that I couldn't find
> any use of mmap memory or any other straight forward native memory leak
> issue. That leads me to believing that it can be a rocksDB issue, although
> you do make a valid point about that there is extra 2GB in the yarn
> container which should account for RocksDB extra usage. I also saw this
> JIRA ticket for RocksDB memory leak issue on K8 kubernetes and was
> wondering if the same could happen on yarn containers and is related to my
> issue [1]. Let me know what you guys think about this.
>
> Also, I tried running the same job using FileSystemBackend (as a separate
> job) and it went fine with no container kills and native memory not rising
> over time, which hints further towards RocksDB being the culprit. My state
> size in the checkpoint is around 1GB (can probably even think of switching
> to FileSystemBackend for this job but still want to figure out the case for
> RocksDB). I am using incremental checkpoints in my main job which has
> RocksDB state backend, if that's relevant.
>
> I read about native memory tracking and probably go ahead and use Native
> Memory Tracking (NMT) or jemalloc to confirm about the RocksDB issue and
> update here.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-18712
>
> Thanks
> Shubham
>
> On Mon, Sep 21, 2020 at 8:23 AM Xintong Song <to...@gmail.com>
> wrote:
>
>> Hi Shubham,
>>
>> Java heap memory cannot cause a container memory exceeding. Heap memory
>> is strictly limited by the JVM `-Xmx` parameter. If the program does need
>> more memory than the limit, it will run into a heap space OOM, rather than
>> implicitly using more memory than the limit.
>>
>> Several reasons that might lead to container memory exceeding.
>> - RocksDB, whose memory controlling is based on estimation rather than
>> hard limit. This is one of the most common reasons for such memory
>> exceedings. However, usually the extra memory usage introduced by RocksDB,
>> if there's any, should not be too large. Given that your container size is
>> 12GB and Flink only plans to use 10GB, I'm not sure whether RocksDB is the
>> cause in your case. I've CC'ed @Yun Tang, who is the expert of Flink's
>> RocksDB state backend.
>> - Does your job use mmap memory? MMap memory, if used, is controlled by
>> the operating system, not Flink. Depending on your Yarn cgroup
>> configurations, some clusters would also count that as part of the
>> container memory consumption.
>> - Native memory leaks in user code dependencies and libraries could also
>> lead to container memory exceeding.
>>
>> Another suggestion is, do not trust Flink's "Non-Heap" metrics. It is
>> practically helpless and misleading. The "Non-Heap" accounts for SOME of
>> the non-heap memory usage, but NOT ALL of them. The community is working on
>> a new set of metrics and Web UI for the task manager memory tuning.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Sun, Sep 20, 2020 at 12:10 AM Shubham Kumar <
>> shubhamkumar1032@gmail.com> wrote:
>>
>>> Hey everyone,
>>>
>>> We had deployed a streaming job using Flink 1.10.1 one month back and
>>> now we are encountering a Yarn container killed due to memory issues very
>>> frequently. I am trying to figure out the root cause of this issue in order
>>> to fix it.
>>>
>>> We have a streaming job whose basic structure looks like this:
>>> - Read 6 kafka streams and combine stats from them (union) to form a
>>> single stream
>>> - stream.keyBy(MyKey)
>>>              .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>>>              .reduce(MyReduceFunction)
>>>              .addSink(new FlinkKafkaProducer011<>...);
>>>
>>> We are using RocksDB as state backend. In flink-conf.yaml, we used
>>> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
>>> one slot per task manager.
>>>
>>> So, a taskmanager process gets started with the following memory
>>> components as indicated in logs:
>>>
>>> TaskExecutor container... will be started on ... with
>>>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>>>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>>>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>>>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>>>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>>>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>>>
>>>
>>>>
>>>
>>>  which are as per defaults.
>>>
>>> Now, after 25 days we started encountering the following yarn container
>>> kill error:
>>>
>>>> Association with remote system [akka.tcp://flink@...] has failed,
>>>> address is now gated for [50] ms. Reason: [Association failed with
>>>> [akka.tcp://flink@...]] Caused by: [java.net.ConnectException:
>>>> Connection refused: .../...:37679]
>>>> 2020-09-09 00:53:24 INFO Closing TaskExecutor connection
>>>> container_e193_1592804717489_149347_01_000011 because: [2020-09-09 00:
>>>> 53:21.417]Container [pid=44371,containerID=container_e193_1592804717489_149347_01_000011]
>>>> is running beyond physical memory limits. Current usage: 12.0 GB of 12
>>>> GB physical memory used; 14.4 GB of 25.2 GB virtual memory used.
>>>> Killing container.
>>>>
>>>
>>> Yarn container size is 12GB as it is only allowed as a multiple of 3 GB
>>> (as per our settings).
>>>
>>> Now, when the YARN reallocates a new container, the program starts again
>>> (without any issues) and after a few hours another container is killed with
>>> the same error and the cycle repeats.
>>> At this point, I want to debug it as a running process without changing
>>> or playing around with various config options for memory as I don't think
>>> just to reproduce the error, I want to wait for ~1 month.
>>>
>>> I have tried to figure out something from Graphite metrics (see
>>> attachments):
>>> [1]: JVM Heap Memory (First 25 days) -> The memory goes up and after
>>> reaching a point goes does and again starts going up. (No container kills
>>> were encountered until 09/09/2020, program started on 14/08/2020)
>>> [2]: JVM Heap Memory (Recent) -> The memory is still going up but it
>>> seems it doesn't even reaches its peak, but instead container is killed
>>> before that itself (within a few hours)
>>>
>>> From [1] and [2], JVM heap memory should not rise up I think, but that
>>> doesn't explain container kill in [2] case if JVM heap memory was the issue
>>> causing container kill.
>>>
>>> [3]: Direct Memory and Off heap Memory -> I don't think this is causing
>>> the issue as most of the network buffers are free and off heap memory is
>>> well below threshold.
>>>
>>> At this point I thought RocksDB might be the culprit. I am aware that it
>>> uses the managed memory limits (I haven't changed any default config) which
>>> is completely off heap. But when I see the rocksDB size maintained at
>>> location:
>>>
>>>
>>>> /data_4/yarn-nm-local-dir/usercache/root/appcache/application_.../flink-io-a48d1127-58a1-41c5-a5f0-32c5180fe74d/job_0bff1881431b5774c3b496a98febed1a_op_WindowOperator_4061fbe16fb95459a1a8d207644e2e63__4_12__uuid_9fe0b2ff-24bc-4301-8044-3fe8e1b3a3a0/db/
>>>
>>>
>>> It is only 17MB which doesn't seem much. I also took a heap dump
>>> of org.apache.flink.yarn.YarnTaskExecutorRunner process but it shows only
>>> 30MB of data is being used (not sure what I am missing here as it doesn't
>>> match with metrics shown by flink).
>>>
>>> Although top -p 'pid' (for task manager process) does show RES = 10-12
>>> GB for every container constantly going up and eventually dies.
>>>
>>> Has someone encountered a similar situation or have guidelines that I
>>> can continue with to figure out and debug the issue? Let me know if there
>>> is anything else that you might wanna know.
>>>
>>> --
>>> Thanks & Regards
>>>
>>> Shubham Kumar
>>>
>>>
>
> --
> Thanks & Regards
>
> Shubham Kumar
>
>

Re: Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

Posted by Shubham Kumar <sh...@gmail.com>.
Hi Xintong,

Thanks for your insights, they are really helpful.

I understand now that it most certainly is a native memory issue rather
than a heap memory issue and about not trusting Flink's Non-Heap metrics.

I do believe that our structure of job is so simple that I couldn't find
any use of mmap memory or any other straight forward native memory leak
issue. That leads me to believing that it can be a rocksDB issue, although
you do make a valid point about that there is extra 2GB in the yarn
container which should account for RocksDB extra usage. I also saw this
JIRA ticket for RocksDB memory leak issue on K8 kubernetes and was
wondering if the same could happen on yarn containers and is related to my
issue [1]. Let me know what you guys think about this.

Also, I tried running the same job using FileSystemBackend (as a separate
job) and it went fine with no container kills and native memory not rising
over time, which hints further towards RocksDB being the culprit. My state
size in the checkpoint is around 1GB (can probably even think of switching
to FileSystemBackend for this job but still want to figure out the case for
RocksDB). I am using incremental checkpoints in my main job which has
RocksDB state backend, if that's relevant.

I read about native memory tracking and probably go ahead and use Native
Memory Tracking (NMT) or jemalloc to confirm about the RocksDB issue and
update here.

[1]: https://issues.apache.org/jira/browse/FLINK-18712

Thanks
Shubham

On Mon, Sep 21, 2020 at 8:23 AM Xintong Song <to...@gmail.com> wrote:

> Hi Shubham,
>
> Java heap memory cannot cause a container memory exceeding. Heap memory is
> strictly limited by the JVM `-Xmx` parameter. If the program does need more
> memory than the limit, it will run into a heap space OOM, rather than
> implicitly using more memory than the limit.
>
> Several reasons that might lead to container memory exceeding.
> - RocksDB, whose memory controlling is based on estimation rather than
> hard limit. This is one of the most common reasons for such memory
> exceedings. However, usually the extra memory usage introduced by RocksDB,
> if there's any, should not be too large. Given that your container size is
> 12GB and Flink only plans to use 10GB, I'm not sure whether RocksDB is the
> cause in your case. I've CC'ed @Yun Tang, who is the expert of Flink's
> RocksDB state backend.
> - Does your job use mmap memory? MMap memory, if used, is controlled by
> the operating system, not Flink. Depending on your Yarn cgroup
> configurations, some clusters would also count that as part of the
> container memory consumption.
> - Native memory leaks in user code dependencies and libraries could also
> lead to container memory exceeding.
>
> Another suggestion is, do not trust Flink's "Non-Heap" metrics. It is
> practically helpless and misleading. The "Non-Heap" accounts for SOME of
> the non-heap memory usage, but NOT ALL of them. The community is working on
> a new set of metrics and Web UI for the task manager memory tuning.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sun, Sep 20, 2020 at 12:10 AM Shubham Kumar <sh...@gmail.com>
> wrote:
>
>> Hey everyone,
>>
>> We had deployed a streaming job using Flink 1.10.1 one month back and now
>> we are encountering a Yarn container killed due to memory issues very
>> frequently. I am trying to figure out the root cause of this issue in order
>> to fix it.
>>
>> We have a streaming job whose basic structure looks like this:
>> - Read 6 kafka streams and combine stats from them (union) to form a
>> single stream
>> - stream.keyBy(MyKey)
>>              .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>>              .reduce(MyReduceFunction)
>>              .addSink(new FlinkKafkaProducer011<>...);
>>
>> We are using RocksDB as state backend. In flink-conf.yaml, we used
>> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
>> one slot per task manager.
>>
>> So, a taskmanager process gets started with the following memory
>> components as indicated in logs:
>>
>> TaskExecutor container... will be started on ... with
>>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>>
>>
>>>
>>
>>  which are as per defaults.
>>
>> Now, after 25 days we started encountering the following yarn container
>> kill error:
>>
>>> Association with remote system [akka.tcp://flink@...] has failed,
>>> address is now gated for [50] ms. Reason: [Association failed with
>>> [akka.tcp://flink@...]] Caused by: [java.net.ConnectException:
>>> Connection refused: .../...:37679]
>>> 2020-09-09 00:53:24 INFO Closing TaskExecutor connection
>>> container_e193_1592804717489_149347_01_000011 because: [2020-09-09 00:53
>>> :21.417]Container [pid=44371,containerID=container_e193_1592804717489_149347_01_000011]
>>> is running beyond physical memory limits. Current usage: 12.0 GB of 12
>>> GB physical memory used; 14.4 GB of 25.2 GB virtual memory used. Killing
>>> container.
>>>
>>
>> Yarn container size is 12GB as it is only allowed as a multiple of 3 GB
>> (as per our settings).
>>
>> Now, when the YARN reallocates a new container, the program starts again
>> (without any issues) and after a few hours another container is killed with
>> the same error and the cycle repeats.
>> At this point, I want to debug it as a running process without changing
>> or playing around with various config options for memory as I don't think
>> just to reproduce the error, I want to wait for ~1 month.
>>
>> I have tried to figure out something from Graphite metrics (see
>> attachments):
>> [1]: JVM Heap Memory (First 25 days) -> The memory goes up and after
>> reaching a point goes does and again starts going up. (No container kills
>> were encountered until 09/09/2020, program started on 14/08/2020)
>> [2]: JVM Heap Memory (Recent) -> The memory is still going up but it
>> seems it doesn't even reaches its peak, but instead container is killed
>> before that itself (within a few hours)
>>
>> From [1] and [2], JVM heap memory should not rise up I think, but that
>> doesn't explain container kill in [2] case if JVM heap memory was the issue
>> causing container kill.
>>
>> [3]: Direct Memory and Off heap Memory -> I don't think this is causing
>> the issue as most of the network buffers are free and off heap memory is
>> well below threshold.
>>
>> At this point I thought RocksDB might be the culprit. I am aware that it
>> uses the managed memory limits (I haven't changed any default config) which
>> is completely off heap. But when I see the rocksDB size maintained at
>> location:
>>
>>
>>> /data_4/yarn-nm-local-dir/usercache/root/appcache/application_.../flink-io-a48d1127-58a1-41c5-a5f0-32c5180fe74d/job_0bff1881431b5774c3b496a98febed1a_op_WindowOperator_4061fbe16fb95459a1a8d207644e2e63__4_12__uuid_9fe0b2ff-24bc-4301-8044-3fe8e1b3a3a0/db/
>>
>>
>> It is only 17MB which doesn't seem much. I also took a heap dump
>> of org.apache.flink.yarn.YarnTaskExecutorRunner process but it shows only
>> 30MB of data is being used (not sure what I am missing here as it doesn't
>> match with metrics shown by flink).
>>
>> Although top -p 'pid' (for task manager process) does show RES = 10-12 GB
>> for every container constantly going up and eventually dies.
>>
>> Has someone encountered a similar situation or have guidelines that I can
>> continue with to figure out and debug the issue? Let me know if there is
>> anything else that you might wanna know.
>>
>> --
>> Thanks & Regards
>>
>> Shubham Kumar
>>
>>

-- 
Thanks & Regards

Shubham Kumar

Re: Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

Posted by Xintong Song <to...@gmail.com>.
Hi Shubham,

Java heap memory cannot cause a container memory exceeding. Heap memory is
strictly limited by the JVM `-Xmx` parameter. If the program does need more
memory than the limit, it will run into a heap space OOM, rather than
implicitly using more memory than the limit.

Several reasons that might lead to container memory exceeding.
- RocksDB, whose memory controlling is based on estimation rather than hard
limit. This is one of the most common reasons for such memory exceedings.
However, usually the extra memory usage introduced by RocksDB, if there's
any, should not be too large. Given that your container size is 12GB and
Flink only plans to use 10GB, I'm not sure whether RocksDB is the cause in
your case. I've CC'ed @Yun Tang, who is the expert of Flink's RocksDB state
backend.
- Does your job use mmap memory? MMap memory, if used, is controlled by the
operating system, not Flink. Depending on your Yarn cgroup configurations,
some clusters would also count that as part of the container memory
consumption.
- Native memory leaks in user code dependencies and libraries could also
lead to container memory exceeding.

Another suggestion is, do not trust Flink's "Non-Heap" metrics. It is
practically helpless and misleading. The "Non-Heap" accounts for SOME of
the non-heap memory usage, but NOT ALL of them. The community is working on
a new set of metrics and Web UI for the task manager memory tuning.

Thank you~

Xintong Song



On Sun, Sep 20, 2020 at 12:10 AM Shubham Kumar <sh...@gmail.com>
wrote:

> Hey everyone,
>
> We had deployed a streaming job using Flink 1.10.1 one month back and now
> we are encountering a Yarn container killed due to memory issues very
> frequently. I am trying to figure out the root cause of this issue in order
> to fix it.
>
> We have a streaming job whose basic structure looks like this:
> - Read 6 kafka streams and combine stats from them (union) to form a
> single stream
> - stream.keyBy(MyKey)
>              .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>              .reduce(MyReduceFunction)
>              .addSink(new FlinkKafkaProducer011<>...);
>
> We are using RocksDB as state backend. In flink-conf.yaml, we used
> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
> one slot per task manager.
>
> So, a taskmanager process gets started with the following memory
> components as indicated in logs:
>
> TaskExecutor container... will be started on ... with
>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>
>
>>
>
>  which are as per defaults.
>
> Now, after 25 days we started encountering the following yarn container
> kill error:
>
>> Association with remote system [akka.tcp://flink@...] has failed,
>> address is now gated for [50] ms. Reason: [Association failed with
>> [akka.tcp://flink@...]] Caused by: [java.net.ConnectException: Connection
>> refused: .../...:37679]
>> 2020-09-09 00:53:24 INFO Closing TaskExecutor connection
>> container_e193_1592804717489_149347_01_000011 because: [2020-09-09 00:53:
>> 21.417]Container [pid=44371,containerID=container_e193_1592804717489_149347_01_000011]
>> is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB
>> physical memory used; 14.4 GB of 25.2 GB virtual memory used. Killing
>> container.
>>
>
> Yarn container size is 12GB as it is only allowed as a multiple of 3 GB
> (as per our settings).
>
> Now, when the YARN reallocates a new container, the program starts again
> (without any issues) and after a few hours another container is killed with
> the same error and the cycle repeats.
> At this point, I want to debug it as a running process without changing or
> playing around with various config options for memory as I don't think just
> to reproduce the error, I want to wait for ~1 month.
>
> I have tried to figure out something from Graphite metrics (see
> attachments):
> [1]: JVM Heap Memory (First 25 days) -> The memory goes up and after
> reaching a point goes does and again starts going up. (No container kills
> were encountered until 09/09/2020, program started on 14/08/2020)
> [2]: JVM Heap Memory (Recent) -> The memory is still going up but it seems
> it doesn't even reaches its peak, but instead container is killed before
> that itself (within a few hours)
>
> From [1] and [2], JVM heap memory should not rise up I think, but that
> doesn't explain container kill in [2] case if JVM heap memory was the issue
> causing container kill.
>
> [3]: Direct Memory and Off heap Memory -> I don't think this is causing
> the issue as most of the network buffers are free and off heap memory is
> well below threshold.
>
> At this point I thought RocksDB might be the culprit. I am aware that it
> uses the managed memory limits (I haven't changed any default config) which
> is completely off heap. But when I see the rocksDB size maintained at
> location:
>
>
>> /data_4/yarn-nm-local-dir/usercache/root/appcache/application_.../flink-io-a48d1127-58a1-41c5-a5f0-32c5180fe74d/job_0bff1881431b5774c3b496a98febed1a_op_WindowOperator_4061fbe16fb95459a1a8d207644e2e63__4_12__uuid_9fe0b2ff-24bc-4301-8044-3fe8e1b3a3a0/db/
>
>
> It is only 17MB which doesn't seem much. I also took a heap dump
> of org.apache.flink.yarn.YarnTaskExecutorRunner process but it shows only
> 30MB of data is being used (not sure what I am missing here as it doesn't
> match with metrics shown by flink).
>
> Although top -p 'pid' (for task manager process) does show RES = 10-12 GB
> for every container constantly going up and eventually dies.
>
> Has someone encountered a similar situation or have guidelines that I can
> continue with to figure out and debug the issue? Let me know if there is
> anything else that you might wanna know.
>
> --
> Thanks & Regards
>
> Shubham Kumar
>
>