You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dimitris Vogiatzidakis <di...@gmail.com> on 2020/06/27 14:56:17 UTC

Optimal Flink configuration for Standalone cluster.

Hello,

I'm having a bit of trouble understanding the memory configuration on
flink.
I'm using flink10.0.0 to read some datasets of edges and extract features.
I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram
each, and hopefully I could expand this as long as I can add extra nodes to
the cluster.

So regarding the configuration file (flink-conf.yaml).
a) I can't understand when should I use process.size and when .flink.size.

b) From the detailed memory model I understand that Direct memory is
included in both of flink and process size, however if I don't specify
off-heap.task.size I get
" OutOfMemoryError: Direct buffer memory " .  Also should I change
off-heap.fraction as well?

c)When I fix this, I get network buffers error, which if I understand
correctly,  flink.size * network fraction , should be between min and max.

I can't find the 'perfect' configuration regarding my setup. What is the
optimal way to use the system I have currently?

Thank you for your time.

Re: Optimal Flink configuration for Standalone cluster.

Posted by Dimitris Vogiatzidakis <di...@gmail.com>.
>
> It could really be specific to your workload. Some workload may need more
> heap memory while others may need more off-heap.
>
The main 'process' of my project creates a cross product of datasets and
then applies a function to all of them to extract some features.


> Alternatively, you can try to launch multiple TMs on one physical machine,
> to reduce the memory size of each TM process.

If I understand correctly you mean instead of 1 TM with 32 slots, I should
have 4 TMs with 8? Or else i would exceed the amount of total cores and
probably have tasks 'waiting' on other tasks to be completed.


BTW, what kind of workload are you running? Is it streaming or batch?
>
It is Batch. I have dataset of edges and try to extract features , to later
be used for link prediction.

Thank you
-Dimitris Vogiatzidakis

>

On Mon, Jun 29, 2020 at 5:07 AM Xintong Song <to...@gmail.com> wrote:

> Since changing off-heap removes memory from '.task.heap.size' is there a
>> ratio that I should follow for better performance?
>>
> I don't think so. It could really be specific to your workload. Some
> workload may need more heap memory while others may need more off-heap.
>
> Also, my guess (since I am dealing with big datasets) is that the more
>> '.flink.size' I provide the better. Is that correct?
>>
> In most cases, yes. But it is also possible the other way around. Larger
> `.flink.size` usually also means larger JVM heap space, which reduces the
> frequency of GCs but increases the time cost on each GC (espeacially full
> GCs). On the other hand, if the memory is large enough, it could become the
> CPU resource rather than the memory that limits the performance. In such
> cases, increasing memory size won't give you more performance improvement
> but might introduce more GC overheads, thus harm the overall performance.
>
> In this particular cluster, since every Machine has 252 total DRAM and
>> worst case scenario 180GB is free to use, should I just say .flink.size:
>> 180g?
>>
> Not sure about this. I would suggest to avoid large task managers (say
> tens of GBs) unless absolutely necessary. Alternatively, you can try to
> launch multiple TMs on one physical machine, to reduce the memory size of
> each TM process.
>
> BTW, what kind of workload are you running? Is it streaming or batch?
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Jun 29, 2020 at 1:18 AM Dimitris Vogiatzidakis <
> dimitrisvogiatzidakis@gmail.com> wrote:
>
>> Hi Xintong,
>> Thank you for the quick response.
>> doing 1), without increasing  'task.off-heap.size'  does not change the
>> issue, but increasing the off-heap alone does.
>> What should the off-heap value size be? Since changing off-heap removes
>> memory from '.task.heap.size' is there a ratio that I should follow for
>> better performance?
>> Also, my guess (since I am dealing with big datasets) is that the more
>> '.flink.size' I provide the better. Is that correct? Or will it add extra
>> 'overhead' that could slow down my computations? In this particular
>> cluster, since every Machine has 252 total DRAM and worst case scenario
>> 180GB is free to use, should I just say .flink.size: 180g?
>>
>> Thank you very much and sorry if i'm asking silly questions.
>> Dimitris Vogiatzidakis
>>
>> On Sun, Jun 28, 2020 at 5:25 AM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> Hi Dimitris,
>>>
>>> Regarding your questions.
>>> a) For standalone clusters, the recommended way is to use `.flink.size`
>>> rather than `.process.size`. `.process.size` includes JVM metaspace and
>>> overhead in addition to `.flink.size`, which usually do not really matter
>>> for standalone clusters.
>>> b) In case of direct OOMs, you should increase
>>> `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
>>> c) Your understanding is correct. And you can also specify the absolute
>>> network memory size by setting the min and max to the same value.
>>>
>>> Here are my suggestions according to what you described.
>>>
>>>    1. Since both off-heap and network memory seems insufficient, I
>>>    would suggest to increase `taskmanager.memory.flink.size` to give your task
>>>    managers more memory in total.
>>>    2. If 1) does not work, I would suggest not to set the total memory
>>>    (means configure neither `.flink.size` nor `process.size`), but go for the
>>>    fine grained configuration where explicitly specify the individual memory
>>>    components. Flink will automatically add them up to derive the total memory.
>>>       1. In addition to `.task.off-heap.size` and `.network.[min|max]`,
>>>       you will also need to set `.task.heap.size` and `managed.size`.
>>>       2. If you don't know how many heap/managed memory to configure,
>>>       you can look for the configuration options in the beginning of the TM logs
>>>       (`-Dkey=value`). Those are the values derived from your current
>>>       configuration.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <
>>> dimitrisvogiatzidakis@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm having a bit of trouble understanding the memory configuration on
>>>> flink.
>>>> I'm using flink10.0.0 to read some datasets of edges and extract
>>>> features. I run this on a cluster consisting of 4 nodes , with 32cores and
>>>> 252GB Ram each, and hopefully I could expand this as long as I can add
>>>> extra nodes to the cluster.
>>>>
>>>> So regarding the configuration file (flink-conf.yaml).
>>>> a) I can't understand when should I use process.size and when
>>>> .flink.size.
>>>>
>>>> b) From the detailed memory model I understand that Direct memory is
>>>> included in both of flink and process size, however if I don't specify
>>>> off-heap.task.size I get
>>>> " OutOfMemoryError: Direct buffer memory " .  Also should I change
>>>> off-heap.fraction as well?
>>>>
>>>> c)When I fix this, I get network buffers error, which if I understand
>>>> correctly,  flink.size * network fraction , should be between min and max.
>>>>
>>>> I can't find the 'perfect' configuration regarding my setup. What is
>>>> the optimal way to use the system I have currently?
>>>>
>>>> Thank you for your time.
>>>>
>>>>
>>>>

Re: Optimal Flink configuration for Standalone cluster.

Posted by Xintong Song <to...@gmail.com>.
>
> Since changing off-heap removes memory from '.task.heap.size' is there a
> ratio that I should follow for better performance?
>
I don't think so. It could really be specific to your workload. Some
workload may need more heap memory while others may need more off-heap.

Also, my guess (since I am dealing with big datasets) is that the more
> '.flink.size' I provide the better. Is that correct?
>
In most cases, yes. But it is also possible the other way around. Larger
`.flink.size` usually also means larger JVM heap space, which reduces the
frequency of GCs but increases the time cost on each GC (espeacially full
GCs). On the other hand, if the memory is large enough, it could become the
CPU resource rather than the memory that limits the performance. In such
cases, increasing memory size won't give you more performance improvement
but might introduce more GC overheads, thus harm the overall performance.

In this particular cluster, since every Machine has 252 total DRAM and
> worst case scenario 180GB is free to use, should I just say .flink.size:
> 180g?
>
Not sure about this. I would suggest to avoid large task managers (say tens
of GBs) unless absolutely necessary. Alternatively, you can try to launch
multiple TMs on one physical machine, to reduce the memory size of each TM
process.

BTW, what kind of workload are you running? Is it streaming or batch?


Thank you~

Xintong Song



On Mon, Jun 29, 2020 at 1:18 AM Dimitris Vogiatzidakis <
dimitrisvogiatzidakis@gmail.com> wrote:

> Hi Xintong,
> Thank you for the quick response.
> doing 1), without increasing  'task.off-heap.size'  does not change the
> issue, but increasing the off-heap alone does.
> What should the off-heap value size be? Since changing off-heap removes
> memory from '.task.heap.size' is there a ratio that I should follow for
> better performance?
> Also, my guess (since I am dealing with big datasets) is that the more
> '.flink.size' I provide the better. Is that correct? Or will it add extra
> 'overhead' that could slow down my computations? In this particular
> cluster, since every Machine has 252 total DRAM and worst case scenario
> 180GB is free to use, should I just say .flink.size: 180g?
>
> Thank you very much and sorry if i'm asking silly questions.
> Dimitris Vogiatzidakis
>
> On Sun, Jun 28, 2020 at 5:25 AM Xintong Song <to...@gmail.com>
> wrote:
>
>> Hi Dimitris,
>>
>> Regarding your questions.
>> a) For standalone clusters, the recommended way is to use `.flink.size`
>> rather than `.process.size`. `.process.size` includes JVM metaspace and
>> overhead in addition to `.flink.size`, which usually do not really matter
>> for standalone clusters.
>> b) In case of direct OOMs, you should increase
>> `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
>> c) Your understanding is correct. And you can also specify the absolute
>> network memory size by setting the min and max to the same value.
>>
>> Here are my suggestions according to what you described.
>>
>>    1. Since both off-heap and network memory seems insufficient, I would
>>    suggest to increase `taskmanager.memory.flink.size` to give your task
>>    managers more memory in total.
>>    2. If 1) does not work, I would suggest not to set the total memory
>>    (means configure neither `.flink.size` nor `process.size`), but go for the
>>    fine grained configuration where explicitly specify the individual memory
>>    components. Flink will automatically add them up to derive the total memory.
>>       1. In addition to `.task.off-heap.size` and `.network.[min|max]`,
>>       you will also need to set `.task.heap.size` and `managed.size`.
>>       2. If you don't know how many heap/managed memory to configure,
>>       you can look for the configuration options in the beginning of the TM logs
>>       (`-Dkey=value`). Those are the values derived from your current
>>       configuration.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <
>> dimitrisvogiatzidakis@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I'm having a bit of trouble understanding the memory configuration on
>>> flink.
>>> I'm using flink10.0.0 to read some datasets of edges and extract
>>> features. I run this on a cluster consisting of 4 nodes , with 32cores and
>>> 252GB Ram each, and hopefully I could expand this as long as I can add
>>> extra nodes to the cluster.
>>>
>>> So regarding the configuration file (flink-conf.yaml).
>>> a) I can't understand when should I use process.size and when
>>> .flink.size.
>>>
>>> b) From the detailed memory model I understand that Direct memory is
>>> included in both of flink and process size, however if I don't specify
>>> off-heap.task.size I get
>>> " OutOfMemoryError: Direct buffer memory " .  Also should I change
>>> off-heap.fraction as well?
>>>
>>> c)When I fix this, I get network buffers error, which if I understand
>>> correctly,  flink.size * network fraction , should be between min and max.
>>>
>>> I can't find the 'perfect' configuration regarding my setup. What is the
>>> optimal way to use the system I have currently?
>>>
>>> Thank you for your time.
>>>
>>>
>>>

Re: Optimal Flink configuration for Standalone cluster.

Posted by Dimitris Vogiatzidakis <di...@gmail.com>.
Hi Xintong,
Thank you for the quick response.
doing 1), without increasing  'task.off-heap.size'  does not change the
issue, but increasing the off-heap alone does.
What should the off-heap value size be? Since changing off-heap removes
memory from '.task.heap.size' is there a ratio that I should follow for
better performance?
Also, my guess (since I am dealing with big datasets) is that the more
'.flink.size' I provide the better. Is that correct? Or will it add extra
'overhead' that could slow down my computations? In this particular
cluster, since every Machine has 252 total DRAM and worst case scenario
180GB is free to use, should I just say .flink.size: 180g?

Thank you very much and sorry if i'm asking silly questions.
Dimitris Vogiatzidakis

On Sun, Jun 28, 2020 at 5:25 AM Xintong Song <to...@gmail.com> wrote:

> Hi Dimitris,
>
> Regarding your questions.
> a) For standalone clusters, the recommended way is to use `.flink.size`
> rather than `.process.size`. `.process.size` includes JVM metaspace and
> overhead in addition to `.flink.size`, which usually do not really matter
> for standalone clusters.
> b) In case of direct OOMs, you should increase
> `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
> c) Your understanding is correct. And you can also specify the absolute
> network memory size by setting the min and max to the same value.
>
> Here are my suggestions according to what you described.
>
>    1. Since both off-heap and network memory seems insufficient, I would
>    suggest to increase `taskmanager.memory.flink.size` to give your task
>    managers more memory in total.
>    2. If 1) does not work, I would suggest not to set the total memory
>    (means configure neither `.flink.size` nor `process.size`), but go for the
>    fine grained configuration where explicitly specify the individual memory
>    components. Flink will automatically add them up to derive the total memory.
>       1. In addition to `.task.off-heap.size` and `.network.[min|max]`,
>       you will also need to set `.task.heap.size` and `managed.size`.
>       2. If you don't know how many heap/managed memory to configure, you
>       can look for the configuration options in the beginning of the TM logs
>       (`-Dkey=value`). Those are the values derived from your current
>       configuration.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <
> dimitrisvogiatzidakis@gmail.com> wrote:
>
>> Hello,
>>
>> I'm having a bit of trouble understanding the memory configuration on
>> flink.
>> I'm using flink10.0.0 to read some datasets of edges and extract
>> features. I run this on a cluster consisting of 4 nodes , with 32cores and
>> 252GB Ram each, and hopefully I could expand this as long as I can add
>> extra nodes to the cluster.
>>
>> So regarding the configuration file (flink-conf.yaml).
>> a) I can't understand when should I use process.size and when
>> .flink.size.
>>
>> b) From the detailed memory model I understand that Direct memory is
>> included in both of flink and process size, however if I don't specify
>> off-heap.task.size I get
>> " OutOfMemoryError: Direct buffer memory " .  Also should I change
>> off-heap.fraction as well?
>>
>> c)When I fix this, I get network buffers error, which if I understand
>> correctly,  flink.size * network fraction , should be between min and max.
>>
>> I can't find the 'perfect' configuration regarding my setup. What is the
>> optimal way to use the system I have currently?
>>
>> Thank you for your time.
>>
>>
>>

Re: Optimal Flink configuration for Standalone cluster.

Posted by Xintong Song <to...@gmail.com>.
Hi Dimitris,

Regarding your questions.
a) For standalone clusters, the recommended way is to use `.flink.size`
rather than `.process.size`. `.process.size` includes JVM metaspace and
overhead in addition to `.flink.size`, which usually do not really matter
for standalone clusters.
b) In case of direct OOMs, you should increase
`taskmanager.memory.task.off-heap.size`. There's no fraction for that.
c) Your understanding is correct. And you can also specify the absolute
network memory size by setting the min and max to the same value.

Here are my suggestions according to what you described.

   1. Since both off-heap and network memory seems insufficient, I would
   suggest to increase `taskmanager.memory.flink.size` to give your task
   managers more memory in total.
   2. If 1) does not work, I would suggest not to set the total memory
   (means configure neither `.flink.size` nor `process.size`), but go for the
   fine grained configuration where explicitly specify the individual memory
   components. Flink will automatically add them up to derive the total memory.
      1. In addition to `.task.off-heap.size` and `.network.[min|max]`, you
      will also need to set `.task.heap.size` and `managed.size`.
      2. If you don't know how many heap/managed memory to configure, you
      can look for the configuration options in the beginning of the TM logs
      (`-Dkey=value`). Those are the values derived from your current
      configuration.


Thank you~

Xintong Song



On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <
dimitrisvogiatzidakis@gmail.com> wrote:

> Hello,
>
> I'm having a bit of trouble understanding the memory configuration on
> flink.
> I'm using flink10.0.0 to read some datasets of edges and extract features.
> I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram
> each, and hopefully I could expand this as long as I can add extra nodes to
> the cluster.
>
> So regarding the configuration file (flink-conf.yaml).
> a) I can't understand when should I use process.size and when .flink.size.
>
> b) From the detailed memory model I understand that Direct memory is
> included in both of flink and process size, however if I don't specify
> off-heap.task.size I get
> " OutOfMemoryError: Direct buffer memory " .  Also should I change
> off-heap.fraction as well?
>
> c)When I fix this, I get network buffers error, which if I understand
> correctly,  flink.size * network fraction , should be between min and max.
>
> I can't find the 'perfect' configuration regarding my setup. What is the
> optimal way to use the system I have currently?
>
> Thank you for your time.
>
>
>