You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by seeksst <se...@163.com> on 2020/03/23 03:16:55 UTC

backpressure and memory

Hi, everyone:


I’m a flink sql user, and the version is 1.8.2.
    Recently I confuse about memory and backpressure. I have two job on yarn, due to memory over, it’s frequently killed by yarn.
One job,I have 3 taskmanagers and 6 parallelism, each one has 8G memory.It read from kafka, one minute tumble windows to calculate pv and uv. There are many aggregation dimensions, to avoid data skew, it group by deviceId,TUMBLE(event_time, INTERVAL '1' MINUTE)。My question is that the checkpoint is just 60MB, I give 24G memory, why it was killed by yarn? I use rocksdb as backend, and data is big, but I think it should trigger backpressure rather than OOM, although it dosen’t trigger. In Pool Usage is 0.45 normally.
Another job looks different, I use 2 taskmanagers and 4 parallelism, each one has 20G memory. I define a aggregate functions to calculate complex data, group by date,hour,deviceId. it seems like first job, OOM and no backpressure. but the problem is when I read one day data, just one taskmanager was killed by yarn, I confuse about this. according to dashboard, I don't find data skew, but why just one taskmanager?
May be it’s the same question or not, but I want to know more about memory used in flink, and backpressure can stop source or not, and how to trigger it, rocksdb affect on flink.
Thanks for reading, it would be better if there were some suggestions.Thank you.

Re: backpressure and memory

Posted by Arvid Heise <ar...@ververica.com>.
When YARN kills a job because of memory, it usually means that the job has
used more memory than it requested. Since Flink's memory model consists not
only from the Java on-heap memory but also some rocksdb off-heap memory,
it's usually harder to stay within the boundaries. The general shortcoming
have been recently addressed by FLIP-49 [1], available since 1.10.0. The
solution is usually to plan for more off-heap memory [2].

You also refer to OOM, which is very different from a yarn kill. A Java OOM
usually only happens if your program requires more on-heap memory than Java
can provide, for example if you load large models into memory. If you
actually experience an OOM, you should rather increase the on-heap memory
for your program (decrease managed on-heap for Flink).

In all cases, checkpointing is doing something completely different from
what you expect. [3] In short, checkpointing will periodically save the
state of your application such that when you experience a failure (such as
OOM or yarn kill), it can resume from the stored state.

Finally, backpressure has not much to do with memory consumption (except
network buffers usage, which is rather tiny on your configuration).
Backpressure means that one part of your application cannot process data as
fast as it arrives, so backpressure is only I/O or CPU induced, never from
memory. It would only come into play here by prolonging checkpointing times
(potentially leading to high checkpointing latencies, which is always bad).

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html

On Mon, Mar 23, 2020 at 4:17 AM seeksst <se...@163.com> wrote:

> Hi, everyone:
>
>
> I’m a flink sql user, and the version is 1.8.2.
>
>         Recently I confuse about memory and backpressure. I have two job
> on yarn, due to memory over, it’s frequently killed by yarn.
>
> One job, I have 3 taskmanagers and 6 parallelism, each one has 8G
> memory.It read from kafka, one minute tumble windows to calculate pv and
> uv. There are many aggregation dimensions, to avoid data skew, it group by
> deviceId,TUMBLE(event_time, INTERVAL '1' MINUTE)。My question is that the
> checkpoint is just 60MB, I give 24G memory, why it was killed by yarn? I
> use rocksdb as backend, and data is big, but I think it should trigger
> backpressure rather than OOM, although it dosen’t trigger. In Pool Usage is
> 0.45 normally.
>
> Another job looks different, I use 2 taskmanagers and 4 parallelism, each
> one has 20G memory. I define a aggregate functions to calculate complex
> data, group by date,hour,deviceId. it seems like first job, OOM and no
> backpressure. but the problem is when I read one day data, just one
> taskmanager was killed by yarn, I confuse about this. according to
> dashboard, I don't find data skew, but why just one taskmanager?
>
> May be it’s the same question or not, but I want to know more about memory
> used in flink, and backpressure can stop source or not, and how to trigger
> it, rocksdb affect on flink.
>
> Thanks for reading, it would be better if there were some
> suggestions.Thank you.
>
>