You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com> on 2018/10/17 05:39:59 UTC

回复:Need help to understand memory consumption

Hi Julien,

Flink would manage the default 70% fraction of free memory in TaskManager for caching data efficiently, just as you mentioned in this article "https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html". These managed memories are persistent resident and referenced by the MemoryManager once allocated, so they will be resident in old region of JVM and will not be recycled by gc. To do so, wecan aovid the costs of creating and recycling the objects repeatedly.

The default parameter "taskmanager.memory.preallocate" is false, that means these managed memories will not be allocated during starting TaskManager. When the job is running, the related tasks would request these managed memories and then you will see the memory consumption is high. When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption. After you restart the TaskManager, the initial memory consumption is low because of lazy allocating via taskmanager.memory.preallocate=false.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Paul Lam <pa...@gmail.com>
发送时间:2018年10月17日(星期三) 12:31
收件人:jpreisner <jp...@free.fr>
抄 送:user <us...@flink.apache.org>
主 题:Re: Need help to understand memory consumption


Hi Julien,

AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC to release the memory. 

Best,
Paul Lam

> 在 2018年10月12日,14:29,jpreisner@free.fr 写道:
> 
> Hi,
> 
> My use case is : 
> - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 TaskManager)
> - I run N jobs per days. N may vary (one day : N=20, another day : N=50, ...). All jobs are the same. They connect to Kafka topics and have two DB2 connector.
> - Depending on a special event, a job can self-restart via the command : bin/flink cancel <JobID>
> - At the end of the day, I cancel all jobs
> - Each VM is configured with 16Gb RAM
> - Allocated memory configured for one taskmanager is 10Gb
> 
> After several days, the memory saturates (we exceed 14Gb of used memory).
> 
> I read the following posts but I did not succeed in understanding my problem :
> - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> 
> I did some tests on a machine (outside the cluster) with the top command and this is what I concluded (please see attached file - Flink_memory.PNG) :
> - When a job is started and running, it consumes memory
> - When a job is cancelled, a large part of the memory is still used
> - When another job is started and running (after to have cancel the previous job), even more memory is consumed
> - When I restart jobmanager and taskmanager, memory returns to normal
> 
> Why when a job is canceled, the memory is not released?
> 
> I added another attachment that represents the graph of a job - Graph.PNG.
> If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, triggers and windows, ...
> 
> Thanks in advance,
> Julien<Flink_memory.xlsx><Graph.PNG><Flink_memory.PNG>


Re: Need help to understand memory consumption

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Julien,

First of all, if you only run streaming jobs you do not need to worry about
"managed" memory.
Regardless of the state backend, that you use, you should remove state that
you don't need anymore. Otherwise, Flink will keep (and checkpoint) the
state forever.
There is no automatic garbage collection happening for state.

WIth Flink 1.6, Flink added TTL for state which has to be manually enabled
and which will remove state that has not been used since a configurable
time.
Note: in the current version state is only removed under certain
circumstances. The feature is still being completed and improved.

Best, Fabian

Am Mi., 17. Okt. 2018 um 10:57 Uhr schrieb <jp...@free.fr>:

> Hi all,
>
> Thanks for answers. I confirm I have streaming jobs.
>
> If I resume :
> - "When the job is cancelled, these managed memories will be released to
> the MemoryManager but not recycled by gc, so you will see no changes in
> memory consumption" is incorrect because MemoryManager functionnality is
> available only for batch jobs
> - My issue could be resolved by storing state backend in an embedded
> RocksDB instance on disk
>
> Is it exact ? If yes, does that mean that I have to purge old state
> backend in RocksDB ?
>
> Thanks a lot !
>
> Regards,
> Julien.
>
> ----- Mail original -----
> De: "Fabian Hueske" <fh...@gmail.com>
> À: "wangzhijiang999" <wa...@aliyun.com>
> Cc: "Paul Lam" <pa...@gmail.com>, jpreisner@free.fr, "user" <
> user@flink.apache.org>
> Envoyé: Mercredi 17 Octobre 2018 10:03:35
> Objet: Re: Need help to understand memory consumption
>
>
>
> Hi,
>
>
> As was said before, managed memory (as described in the blog post [1]) is
> only used for batch jobs.
> By default, managed memory is only lazily allocated, i.e., when a batch
> job is executed.
>
>
> Streaming jobs maintain state in state backends. Flink provides state
> backends that store the state on the JVM heap or in an embedded RocksDB
> instance on disk.
> The state backend can be chosen per job (the default backend stores state
> on the JVM heap).
>
>
> Best, Fabian
>
>
>
> [1]
> https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
>
>
> Am Mi., 17. Okt. 2018 um 08:53 Uhr schrieb Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com >:
>
>
>
>
> The operators for stream jobs will not use memory management which is only
> for batch jobs as you said.
> I guess the initial feedback is for batch jobs from the description?
>
>
> ------------------------------------------------------------------
> 发件人:Paul Lam < paullin3280@gmail.com >
> 发送时间:2018年10月17日(星期三) 14:35
> 收件人:Zhijiang(wangzhijiang999) < wangzhijiang999@aliyun.com >
> 抄 送:jpreisner < jpreisner@free.fr >; user < user@flink.apache.org >
> 主 题:Re: Need help to understand memory consumption
>
> Hi Zhijiang,
>
>
> Does the memory management apply to streaming jobs as well? A previous
> post[1] said that it can only be used in batch API, but I might miss some
> updates on that. Thank you!
>
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
>
>
> Best,
> Paul Lam
>
>
> 在 2018年10月17日,13:39,Zhijiang(wangzhijiang999) < wangzhijiang999@aliyun.com
> > 写道:
>
>
> Hi Julien,
>
>
> Flink would manage the default 70% fraction of free memory in TaskManager
> for caching data efficiently, just as you mentioned in this article "
> https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> ". These managed memories are persistent resident and referenced by the
> MemoryManager once allocated, so they will be resident in old region of JVM
> and will not be recycled by gc. To do so, wecan aovid the costs of creating
> and recycling the objects repeatedly.
>
>
> The default parameter "taskmanager.memory.preallocate" is false, that
> means these managed memories will not be allocated during starting
> TaskManager. When the job is running, the related tasks would request these
> managed memories and then you will see the memory consumption is high. When
> the job is cancelled, these managed memories will be released to the
> MemoryManager but not recycled by gc, so you will see no changes in memory
> consumption. After you restart the TaskManager, the initial memory
> consumption is low because of lazy allocating via
> taskmanager.memory.preallocate=false.
>
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Paul Lam < paullin3280@gmail.com >
> 发送时间:2018年10月17日(星期三) 12:31
> 收件人:jpreisner < jpreisner@free.fr >
> 抄 送:user < user@flink.apache.org >
> 主 题:Re: Need help to understand memory consumption
>
>
> Hi Julien,
>
> AFAIK, streaming jobs put data objects on heap, so the it depends on the
> JVM GC to release the memory.
>
> Best,
> Paul Lam
>
> > 在 2018年10月12日,14:29, jpreisner@free.fr 写道:
> >
> > Hi,
> >
> > My use case is :
> > - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager
> + 1 TaskManager)
> > - I run N jobs per days. N may vary (one day : N=20, another day : N=50,
> ...). All jobs are the same. They connect to Kafka topics and have two DB2
> connector.
> > - Depending on a special event, a job can self-restart via the command :
> bin/flink cancel <JobID>
> > - At the end of the day, I cancel all jobs
> > - Each VM is configured with 16Gb RAM
> > - Allocated memory configured for one taskmanager is 10Gb
> >
> > After several days, the memory saturates (we exceed 14Gb of used
> memory).
> >
> > I read the following posts but I did not succeed in understanding my
> problem :
> > -
> https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> > -
> http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> >
> > I did some tests on a machine (outside the cluster) with the top command
> and this is what I concluded (please see attached file - Flink_memory.PNG)
> :
> > - When a job is started and running, it consumes memory
> > - When a job is cancelled, a large part of the memory is still used
> > - When another job is started and running (after to have cancel the
> previous job), even more memory is consumed
> > - When I restart jobmanager and taskmanager, memory returns to normal
> >
> > Why when a job is canceled, the memory is not released?
> >
> > I added another attachment that represents the graph of a job -
> Graph.PNG.
> > If it can be useful we use MapFunction, FlatMapFunction, FilterFunction,
> triggers and windows, ...
> >
> > Thanks in advance,
> > Julien<Flink_memory.xlsx><Graph.PNG><Flink_memory.PNG>
>
>
>
>
>

Re: Need help to understand memory consumption

Posted by jp...@free.fr.
Hi all,

Thanks for answers. I confirm I have streaming jobs.

If I resume :
- "When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption" is incorrect because MemoryManager functionnality is available only for batch jobs
- My issue could be resolved by storing state backend in an embedded RocksDB instance on disk

Is it exact ? If yes, does that mean that I have to purge old state backend in RocksDB ?

Thanks a lot !

Regards,
Julien.

----- Mail original -----
De: "Fabian Hueske" <fh...@gmail.com>
À: "wangzhijiang999" <wa...@aliyun.com>
Cc: "Paul Lam" <pa...@gmail.com>, jpreisner@free.fr, "user" <us...@flink.apache.org>
Envoyé: Mercredi 17 Octobre 2018 10:03:35
Objet: Re: Need help to understand memory consumption



Hi, 


As was said before, managed memory (as described in the blog post [1]) is only used for batch jobs. 
By default, managed memory is only lazily allocated, i.e., when a batch job is executed. 


Streaming jobs maintain state in state backends. Flink provides state backends that store the state on the JVM heap or in an embedded RocksDB instance on disk. 
The state backend can be chosen per job (the default backend stores state on the JVM heap). 


Best, Fabian 



[1] https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html 


Am Mi., 17. Okt. 2018 um 08:53 Uhr schrieb Zhijiang(wangzhijiang999) < wangzhijiang999@aliyun.com >: 




The operators for stream jobs will not use memory management which is only for batch jobs as you said. 
I guess the initial feedback is for batch jobs from the description? 


------------------------------------------------------------------ 
发件人:Paul Lam < paullin3280@gmail.com > 
发送时间:2018年10月17日(星期三) 14:35 
收件人:Zhijiang(wangzhijiang999) < wangzhijiang999@aliyun.com > 
抄 送:jpreisner < jpreisner@free.fr >; user < user@flink.apache.org > 
主 题:Re: Need help to understand memory consumption 

Hi Zhijiang, 


Does the memory management apply to streaming jobs as well? A previous post[1] said that it can only be used in batch API, but I might miss some updates on that. Thank you! 


[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 


Best, 
Paul Lam 


在 2018年10月17日,13:39,Zhijiang(wangzhijiang999) < wangzhijiang999@aliyun.com > 写道: 


Hi Julien, 


Flink would manage the default 70% fraction of free memory in TaskManager for caching data efficiently, just as you mentioned in this article " https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html ". These managed memories are persistent resident and referenced by the MemoryManager once allocated, so they will be resident in old region of JVM and will not be recycled by gc. To do so, wecan aovid the costs of creating and recycling the objects repeatedly. 


The default parameter "taskmanager.memory.preallocate" is false, that means these managed memories will not be allocated during starting TaskManager. When the job is running, the related tasks would request these managed memories and then you will see the memory consumption is high. When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption. After you restart the TaskManager, the initial memory consumption is low because of lazy allocating via taskmanager.memory.preallocate=false. 


Best, 
Zhijiang 
------------------------------------------------------------------ 
发件人:Paul Lam < paullin3280@gmail.com > 
发送时间:2018年10月17日(星期三) 12:31 
收件人:jpreisner < jpreisner@free.fr > 
抄 送:user < user@flink.apache.org > 
主 题:Re: Need help to understand memory consumption 


Hi Julien, 

AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC to release the memory. 

Best, 
Paul Lam 

> 在 2018年10月12日,14:29, jpreisner@free.fr 写道: 
> 
> Hi, 
> 
> My use case is : 
> - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 TaskManager) 
> - I run N jobs per days. N may vary (one day : N=20, another day : N=50, ...). All jobs are the same. They connect to Kafka topics and have two DB2 connector. 
> - Depending on a special event, a job can self-restart via the command : bin/flink cancel <JobID> 
> - At the end of the day, I cancel all jobs 
> - Each VM is configured with 16Gb RAM 
> - Allocated memory configured for one taskmanager is 10Gb 
> 
> After several days, the memory saturates (we exceed 14Gb of used memory). 
> 
> I read the following posts but I did not succeed in understanding my problem : 
> - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html 
> - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser 
> 
> I did some tests on a machine (outside the cluster) with the top command and this is what I concluded (please see attached file - Flink_memory.PNG) : 
> - When a job is started and running, it consumes memory 
> - When a job is cancelled, a large part of the memory is still used 
> - When another job is started and running (after to have cancel the previous job), even more memory is consumed 
> - When I restart jobmanager and taskmanager, memory returns to normal 
> 
> Why when a job is canceled, the memory is not released? 
> 
> I added another attachment that represents the graph of a job - Graph.PNG. 
> If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, triggers and windows, ... 
> 
> Thanks in advance, 
> Julien<Flink_memory.xlsx><Graph.PNG><Flink_memory.PNG> 





Re: Need help to understand memory consumption

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

As was said before, managed memory (as described in the blog post [1]) is
only used for batch jobs.
By default, managed memory is only lazily allocated, i.e., when a batch job
is executed.

Streaming jobs maintain state in state backends. Flink provides state
backends that store the state on the JVM heap or in an embedded RocksDB
instance on disk.
The state backend can be chosen per job (the default backend stores state
on the JVM heap).

Best, Fabian

[1]
https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html

Am Mi., 17. Okt. 2018 um 08:53 Uhr schrieb Zhijiang(wangzhijiang999) <
wangzhijiang999@aliyun.com>:

> The operators for stream jobs will not use memory management which is only
> for batch jobs as you said.
> I guess the initial feedback is for batch jobs from the description?
>
> ------------------------------------------------------------------
> 发件人:Paul Lam <pa...@gmail.com>
> 发送时间:2018年10月17日(星期三) 14:35
> 收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 抄 送:jpreisner <jp...@free.fr>; user <us...@flink.apache.org>
> 主 题:Re: Need help to understand memory consumption
>
> Hi Zhijiang,
>
> Does the memory management apply to streaming jobs as well? A previous
> post[1] said that it can only be used in batch API, but I might miss some
> updates on that. Thank you!
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
>
> Best,
> Paul Lam
>
> 在 2018年10月17日,13:39,Zhijiang(wangzhijiang999) <wa...@aliyun.com>
> 写道:
>
> Hi Julien,
>
> Flink would manage the default 70% fraction of free memory in TaskManager
> for caching data efficiently, just as you mentioned in this article "
> https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html".
> These managed memories are persistent resident and referenced by the
> MemoryManager once allocated, so they will be resident in old region of JVM
> and will not be recycled by gc. To do so, wecan aovid the costs of creating
> and recycling the objects repeatedly.
>
> The default parameter "taskmanager.memory.preallocate" is false, that
> means these managed memories will not be allocated during starting
> TaskManager. When the job is running, the related tasks would request these
> managed memories and then you will see the memory consumption is high. When
> the job is cancelled, these managed memories will be released to the
> MemoryManager but not recycled by gc, so you will see no changes in memory
> consumption. After you restart the TaskManager, the initial memory
> consumption is low because of lazy allocating
> via taskmanager.memory.preallocate=false.
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Paul Lam <pa...@gmail.com>
> 发送时间:2018年10月17日(星期三) 12:31
> 收件人:jpreisner <jp...@free.fr>
> 抄 送:user <us...@flink.apache.org>
> 主 题:Re: Need help to understand memory consumption
>
>
> Hi Julien,
>
>
> AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC to release the memory.
>
> Best,
> Paul Lam
>
> > 在 2018年10月12日,14:29,jpreisner@free.fr 写道:
> >
> > Hi,
> >
> > My use case is :
>
> > - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 TaskManager)
>
> > - I run N jobs per days. N may vary (one day : N=20, another day : N=50, ...). All jobs are the same. They connect to Kafka topics and have two DB2 connector.
>
> > - Depending on a special event, a job can self-restart via the command : bin/flink cancel <JobID>
> > - At the end of the day, I cancel all jobs
> > - Each VM is configured with 16Gb RAM
> > - Allocated memory configured for one taskmanager is 10Gb
> >
> > After several days, the memory saturates (we exceed 14Gb of used memory).
> >
>
> > I read the following posts but I did not succeed in understanding my problem :
> > -
> https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> > -
> http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> >
>
> > I did some tests on a machine (outside the cluster) with the top command and this is what I concluded (please see attached file - Flink_memory.PNG) :
> > - When a job is started and running, it consumes memory
> > - When a job is cancelled, a large part of the memory is still used
>
> > - When another job is started and running (after to have cancel the previous job), even more memory is consumed
> > - When I restart jobmanager and taskmanager, memory returns to normal
> >
> > Why when a job is canceled, the memory is not released?
> >
>
> > I added another attachment that represents the graph of a job - Graph.PNG.
>
> > If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, triggers and windows, ...
> >
> > Thanks in advance,
> > Julien<Flink_memory.xlsx><Graph.PNG><Flink_memory.PNG>
>
>
>
>

回复:Need help to understand memory consumption

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com>.
The operators for stream jobs will not use memory management which is only for batch jobs as you said.
I guess the initial feedback is for batch jobs from the description?
------------------------------------------------------------------
发件人:Paul Lam <pa...@gmail.com>
发送时间:2018年10月17日(星期三) 14:35
收件人:Zhijiang(wangzhijiang999) <wa...@aliyun.com>
抄 送:jpreisner <jp...@free.fr>; user <us...@flink.apache.org>
主 题:Re: Need help to understand memory consumption

Hi Zhijiang,

Does the memory management apply to streaming jobs as well? A previous post[1] said that it can only be used in batch API, but I might miss some updates on that. Thank you!

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525

Best,
Paul Lam

在 2018年10月17日,13:39,Zhijiang(wangzhijiang999) <wa...@aliyun.com> 写道:
Hi Julien,

Flink would manage the default 70% fraction of free memory in TaskManager for caching data efficiently, just as you mentioned in this article "https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html". These managed memories are persistent resident and referenced by the MemoryManager once allocated, so they will be resident in old region of JVM and will not be recycled by gc. To do so, wecan aovid the costs of creating and recycling the objects repeatedly.

The default parameter "taskmanager.memory.preallocate" is false, that means these managed memories will not be allocated during starting TaskManager. When the job is running, the related tasks would request these managed memories and then you will see the memory consumption is high. When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption. After you restart the TaskManager, the initial memory consumption is low because of lazy allocating via taskmanager.memory.preallocate=false.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Paul Lam <pa...@gmail.com>
发送时间:2018年10月17日(星期三) 12:31
收件人:jpreisner <jp...@free.fr>
抄 送:user <us...@flink.apache.org>
主 题:Re: Need help to understand memory consumption


Hi Julien,

AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC to release the memory. 

Best,
Paul Lam

> 在 2018年10月12日,14:29,jpreisner@free.fr 写道:
> 
> Hi,
> 
> My use case is : 
> - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 TaskManager)
> - I run N jobs per days. N may vary (one day : N=20, another day : N=50, ...). All jobs are the same. They connect to Kafka topics and have two DB2 connector.
> - Depending on a special event, a job can self-restart via the command : bin/flink cancel <JobID>
> - At the end of the day, I cancel all jobs
> - Each VM is configured with 16Gb RAM
> - Allocated memory configured for one taskmanager is 10Gb
> 
> After several days, the memory saturates (we exceed 14Gb of used memory).
> 
> I read the following posts but I did not succeed in understanding my problem :
> - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> 
> I did some tests on a machine (outside the cluster) with the top command and this is what I concluded (please see attached file - Flink_memory.PNG) :
> - When a job is started and running, it consumes memory
> - When a job is cancelled, a large part of the memory is still used
> - When another job is started and running (after to have cancel the previous job), even more memory is consumed
> - When I restart jobmanager and taskmanager, memory returns to normal
> 
> Why when a job is canceled, the memory is not released?
> 
> I added another attachment that represents the graph of a job - Graph.PNG.
> If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, triggers and windows, ...
> 
> Thanks in advance,
> Julien<Flink_memory.xlsx><Graph.PNG><Flink_memory.PNG>




Re: Need help to understand memory consumption

Posted by Paul Lam <pa...@gmail.com>.
Hi Zhijiang,

Does the memory management apply to streaming jobs as well? A previous post[1] said that it can only be used in batch API, but I might miss some updates on that. Thank you!

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525

Best,
Paul Lam

> 在 2018年10月17日,13:39,Zhijiang(wangzhijiang999) <wa...@aliyun.com> 写道:
> 
> Hi Julien,
> 
> Flink would manage the default 70% fraction of free memory in TaskManager for caching data efficiently, just as you mentioned in this article "https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html". These managed memories are persistent resident and referenced by the MemoryManager once allocated, so they will be resident in old region of JVM and will not be recycled by gc. To do so, wecan aovid the costs of creating and recycling the objects repeatedly.
> 
> The default parameter "taskmanager.memory.preallocate" is false, that means these managed memories will not be allocated during starting TaskManager. When the job is running, the related tasks would request these managed memories and then you will see the memory consumption is high. When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption. After you restart the TaskManager, the initial memory consumption is low because of lazy allocating via taskmanager.memory.preallocate=false.
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Paul Lam <pa...@gmail.com>
> 发送时间:2018年10月17日(星期三) 12:31
> 收件人:jpreisner <jp...@free.fr>
> 抄 送:user <us...@flink.apache.org>
> 主 题:Re: Need help to understand memory consumption
> 
> 
> Hi Julien,
> 
> AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC to release the memory. 
> 
> Best,
> Paul Lam
> 
> > 在 2018年10月12日,14:29,jpreisner@free.fr 写道:
> > 
> > Hi,
> > 
> > My use case is : 
> > - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 TaskManager)
> > - I run N jobs per days. N may vary (one day : N=20, another day : N=50, ...). All jobs are the same. They connect to Kafka topics and have two DB2 connector.
> > - Depending on a special event, a job can self-restart via the command : bin/flink cancel <JobID>
> > - At the end of the day, I cancel all jobs
> > - Each VM is configured with 16Gb RAM
> > - Allocated memory configured for one taskmanager is 10Gb
> > 
> > After several days, the memory saturates (we exceed 14Gb of used memory).
> > 
> > I read the following posts but I did not succeed in understanding my problem :
> > - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> > - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> > 
> > I did some tests on a machine (outside the cluster) with the top command and this is what I concluded (please see attached file - Flink_memory.PNG) :
> > - When a job is started and running, it consumes memory
> > - When a job is cancelled, a large part of the memory is still used
> > - When another job is started and running (after to have cancel the previous job), even more memory is consumed
> > - When I restart jobmanager and taskmanager, memory returns to normal
> > 
> > Why when a job is canceled, the memory is not released?
> > 
> > I added another attachment that represents the graph of a job - Graph.PNG.
> > If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, triggers and windows, ...
> > 
> > Thanks in advance,
> > Julien<Flink_memory.xlsx><Graph.PNG><Flink_memory.PNG>
>