You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Hailu, Andreas [Engineering]" <An...@gs.com> on 2021/08/26 01:06:50 UTC

1.9 to 1.11 Managed Memory Migration Questions

Hi folks,

We're about half way complete in migrating our YARN batch processing applications from Flink 1.9 to 1.11, and are currently tackling the memory configuration migrations.

Our test application's sink failed with the following exception while writing to HDFS:

Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...

We submit our applications through a Flink YARN session with -ytm, -yjm etc. We don't have any memory configurations options set aside from 'taskmanager.network.bounded-blocking-subpartition-type: file' which I see is now deprecated and replaced with a new option defaulted to 'file' (which works for us!) SO nearly everything else is as default.

We haven't made any configuration changes yet thus far as we're still combing through the migration instructions, but I did have some questions around what I observed.

1.     I observed that an application ran with "-ytm 12288" on 1.9 receives 8.47GB JVM Heap space and 5.95 Flink Managed Memory space  (as reported by the ApplicationMaster), where on 1.11 it receives 5.79 JVM Heap space and 4.30 Flink Managed Memory space.  Why does this ~30% memory reduction happen?

2.     Piggybacking off point 1, on 1..9 we were not explicitly setting off-heap memory parameters. How would you suggest discerning what properties we should have a look at?

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

RE: 1.9 to 1.11 Managed Memory Migration Questions

Posted by "Hailu, Andreas [Engineering]" <An...@gs.com>.
Thanks Caizhi, this was very helpful.

// ah

From: Caizhi Weng <ts...@gmail.com>
Sent: Thursday, August 26, 2021 10:41 PM
To: Hailu, Andreas [Engineering] <An...@ny.email.gs.com>
Cc: user@flink.apache.org
Subject: Re: 1.9 to 1.11 Managed Memory Migration Questions

Hi!

I've read the first mail again and discover that the direct memory OOM occurs when the job is writing to the sink, not when the data is transferring between tasks through the network.

I'm not familiar with HDFS, but I guess writing to HDFS will require some direct memory. Maybe a detailed stack trace will prove me right.

 How does this relate with the overall TaskManager process memory

See [1] for the detailed memory module. In short, task.off-heap.size is a part of the overall task manager process memory, which is dedicated for user's code (by user's code I mean UDF, sources, sinks, etc.). Network off-heap memory is only used for shuffling data between tasks and will not help for writing to HDFS, also managed memory is only used for operators such as joins and aggregations.

is there a way to make this scale along with it for jobs that process larger batches of data?

There is no way to do so currently, but I don't see why this is needed because for larger batches of data (I suppose you will increase the parallelism for larger data) more task managers will be allocated by Yarn (if you're not increasing the number of slots per task manager provides). All these memory settings are for per task managers, which means as the number of task managers scales, the total size of off-heap memory naturally scales.

What does having a default value of 0 bytes mean?

Most sources and sinks are not using direct memory, so a default 0 value is reasonable. Only when the user needs it (for example in this case you're using an HDFS sink) shall he allocate this part of memory.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_memory_mem-5Fsetup-5Ftm.html-23detailed-2Dmemory-2Dmodel&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=sZ2fkdpbwJ-blzd0Ch6lj9ZeGNissEN4890aD86V0ig&s=OAIyebbjZVyznz3av3_PaWZXaEF1PeitPvQL9PE0gdk&e=>

Hailu, Andreas [Engineering] <An...@gs.com>> 于2021年8月27日周五 上午5:16写道:
Hi Caizhi, thanks for responding.

The networking keys you suggested didn’t help, but I found that adding the ‘taskmanager.memory.task.off-heap.size’ with a value of ‘1g’ lead to a successful job. I can see on this property’s documentation [1] that the default value is 0 bytes.

Task Off-Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory and native memory) reserved for tasks. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.
0 bytes (default)

From the mem setup page: [2]

The off-heap memory which is allocated by user code should be accounted for in task off-heap memory.

A few points of clarity if you would:

1.     How does this relate with the overall TaskManager process memory, and is there a way to make this scale along with it for jobs that process larger batches of data?

2.     What does having a default value of 0 bytes mean?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-task-off-heap-size<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_config.html-23taskmanager-2Dmemory-2Dtask-2Doff-2Dheap-2Dsize&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=sZ2fkdpbwJ-blzd0Ch6lj9ZeGNissEN4890aD86V0ig&s=gnGvxXf_ZRe9UbYqX3g2hzhbeSdAx-_NopPZXNj2i2o&e=>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#configure-off-heap-memory-direct-or-native<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_memory_mem-5Fsetup.html-23configure-2Doff-2Dheap-2Dmemory-2Ddirect-2Dor-2Dnative&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=sZ2fkdpbwJ-blzd0Ch6lj9ZeGNissEN4890aD86V0ig&s=XFgNHAAS-ex6fvNyJThfwQEnALzw0jkY-LMV5KB39yU&e=>

// ah

From: Caizhi Weng <ts...@gmail.com>>
Sent: Wednesday, August 25, 2021 10:47 PM
To: Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: 1.9 to 1.11 Managed Memory Migration Questions

Hi!

Why does this ~30% memory reduction happen?

I don't know how memory is calculated in Flink 1.9 but this 1.11 memory allocation result is reasonable. This is because managed memory, network memory and JVM overhead memory in 1.11 all has their default sizes or fractions (managed memory 40%, network memory 10% with 1g max, JVM overhead memory 10% with 1g max. See [1]), while heap memory doesn't. So a 5.8GB heap (about 12G - 2G - 40%) and 4.3G managed memory (about 40%) is explainable.

How would you suggest discerning what properties we should have a look at?

Network shuffling memory now has its own configuration key, which is taskmanager.memory.network.fraction (and ...network.min and ...network.max).Also see [1] and [2] for more keys related to task manager's memory.

[1] https://github.com/apache/flink/blob/release-1.11/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_release-2D1.11_flink-2Dcore_src_main_java_org_apache_flink_configuration_TaskManagerOptions.java&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=8cDa79cdtGYTK87Mmwovq0nm7DRSnPuGXlEGR_UjZZw&s=2QfgZUSjhslppaIXyyJQSP_SgSfUtBPQJzlFaocTh4Y&e=>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_memory_mem-5Fsetup-5Ftm.html-23detailed-2Dmemory-2Dmodel&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=8cDa79cdtGYTK87Mmwovq0nm7DRSnPuGXlEGR_UjZZw&s=He5punXHksNs-7hcAETmZ7oJV8pxAqv19UKDYIOQduk&e=>

Hailu, Andreas [Engineering] <An...@gs.com>> 于2021年8月26日周四 上午9:07写道:
Hi folks,

We’re about half way complete in migrating our YARN batch processing applications from Flink 1.9 to 1.11, and are currently tackling the memory configuration migrations.

Our test application’s sink failed with the following exception while writing to HDFS:

Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...

We submit our applications through a Flink YARN session with –ytm, -yjm etc. We don’t have any memory configurations options set aside from ‘taskmanager.network.bounded-blocking-subpartition-type: file’ which I see is now deprecated and replaced with a new option defaulted to ‘file’ (which works for us!) SO nearly everything else is as default.

We haven’t made any configuration changes yet thus far as we’re still combing through the migration instructions, but I did have some questions around what I observed.

1.     I observed that an application ran with “–ytm 12288” on 1.9 receives 8.47GB JVM Heap space and 5.95 Flink Managed Memory space  (as reported by the ApplicationMaster), where on 1.11 it receives 5.79 JVM Heap space and 4.30 Flink Managed Memory space.  Why does this ~30% memory reduction happen?

2.     Piggybacking off point 1, on 1..9 we were not explicitly setting off-heap memory parameters. How would you suggest discerning what properties we should have a look at?

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: 1.9 to 1.11 Managed Memory Migration Questions

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

I've read the first mail again and discover that the direct memory OOM
occurs when the job is writing to the sink, not when the data is
transferring between tasks through the network.

I'm not familiar with HDFS, but I guess writing to HDFS will require some
direct memory. Maybe a detailed stack trace will prove me right.

 How does this relate with the overall TaskManager process memory


See [1] for the detailed memory module. In short, task.off-heap.size is a
part of the overall task manager process memory, which is dedicated for
user's code (by user's code I mean UDF, sources, sinks, etc.). Network
off-heap memory is only used for shuffling data between tasks and will not
help for writing to HDFS, also managed memory is only used for operators
such as joins and aggregations.

is there a way to make this scale along with it for jobs that process
> larger batches of data?


There is no way to do so currently, but I don't see why this is needed
because for larger batches of data (I suppose you will increase the
parallelism for larger data) more task managers will be allocated by Yarn
(if you're not increasing the number of slots per task manager provides).
All these memory settings are for per task managers, which means as the
number of task managers scales, the total size of off-heap memory naturally
scales.

What does having a default value of 0 bytes mean?
>

Most sources and sinks are not using direct memory, so a default 0 value is
reasonable. Only when the user needs it (for example in this case you're
using an HDFS sink) shall he allocate this part of memory.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model


Hailu, Andreas [Engineering] <An...@gs.com> 于2021年8月27日周五 上午5:16写道:

> Hi Caizhi, thanks for responding.
>
>
>
> The networking keys you suggested didn’t help, but I found that adding the
> ‘taskmanager.memory.task.off-heap.size’ with a value of ‘1g’ lead to a
> successful job. I can see on this property’s documentation [1] that the
> default value is 0 bytes.
>
>
>
> *Task Off-Heap Memory size for TaskExecutors. This is the size of off heap
> memory (JVM direct memory and native memory) reserved for tasks. The
> configured value will be fully counted when Flink calculates the JVM max
> direct memory size parameter.*
>
> *0 bytes (default)*
>
>
>
> From the mem setup page: [2]
>
>
>
> *The off-heap memory which is allocated by user code should be accounted
> for in task off-heap memory.*
>
>
>
> A few points of clarity if you would:
>
> 1.     How does this relate with the overall TaskManager process memory,
> and is there a way to make this scale along with it for jobs that process
> larger batches of data?
>
> 2.     What does having a default value of 0 bytes mean?
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-task-off-heap-size
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#configure-off-heap-memory-direct-or-native
>
>
>
> *// *ah
>
>
>
> *From:* Caizhi Weng <ts...@gmail.com>
> *Sent:* Wednesday, August 25, 2021 10:47 PM
> *To:* Hailu, Andreas [Engineering] <An...@ny.email.gs.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: 1.9 to 1.11 Managed Memory Migration Questions
>
>
>
> Hi!
>
>
>
> Why does this ~30% memory reduction happen?
>
>
>
> I don't know how memory is calculated in Flink 1.9 but this 1.11 memory
> allocation result is reasonable. This is because managed memory, network
> memory and JVM overhead memory in 1.11 all has their default sizes or
> fractions (managed memory 40%, network memory 10% with 1g max, JVM overhead
> memory 10% with 1g max. See [1]), while heap memory doesn't. So a 5.8GB
> heap (about 12G - 2G - 40%) and 4.3G managed memory (about 40%) is
> explainable.
>
>
>
> How would you suggest discerning what properties we should have a look at?
>
>
>
> Network shuffling memory now has its own configuration key, which is
> taskmanager.memory.network.fraction (and ...network.min and
> ...network.max).Also see [1] and [2] for more keys related to task
> manager's memory.
>
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.11/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_release-2D1.11_flink-2Dcore_src_main_java_org_apache_flink_configuration_TaskManagerOptions.java&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=8cDa79cdtGYTK87Mmwovq0nm7DRSnPuGXlEGR_UjZZw&s=2QfgZUSjhslppaIXyyJQSP_SgSfUtBPQJzlFaocTh4Y&e=>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_memory_mem-5Fsetup-5Ftm.html-23detailed-2Dmemory-2Dmodel&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=8cDa79cdtGYTK87Mmwovq0nm7DRSnPuGXlEGR_UjZZw&s=He5punXHksNs-7hcAETmZ7oJV8pxAqv19UKDYIOQduk&e=>
>
>
>
> Hailu, Andreas [Engineering] <An...@gs.com> 于2021年8月26日周四 上午9:07
> 写道:
>
> Hi folks,
>
>
>
> We’re about half way complete in migrating our YARN batch processing
> applications from Flink 1.9 to 1.11, and are currently tackling the memory
> configuration migrations.
>
>
>
> Our test application’s sink failed with the following exception while
> writing to HDFS:
>
>
>
> *Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should
> not result in out-of-memory error. In certain special cases, in particular
> for jobs with high parallelism, the framework may require more direct
> memory which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory
> leak in user code or some of its dependencies which has to be investigated
> and fixed. The task executor has to be shutdown...*
>
>
>
> We submit our applications through a Flink YARN session with –ytm, -yjm
> etc. We don’t have any memory configurations options set aside from
> ‘taskmanager.network.bounded-blocking-subpartition-type: file’ which I see
> is now deprecated and replaced with a new option defaulted to ‘file’ (which
> works for us!) SO nearly everything else is as default.
>
>
>
> We haven’t made any configuration changes yet thus far as we’re still
> combing through the migration instructions, but I did have some questions
> around what I observed.
>
> 1.     I observed that an application ran with “–ytm 12288” on 1.9
> receives 8.47GB JVM Heap space and 5.95 Flink Managed Memory space  (as
> reported by the ApplicationMaster), where on 1.11 it receives 5.79 JVM Heap
> space and 4.30 Flink Managed Memory space.  Why does this ~30% memory
> reduction happen?
>
> 2.     Piggybacking off point 1, on 1..9 we were not explicitly setting
> off-heap memory parameters. How would you suggest discerning what
> properties we should have a look at?
>
>
>
> Best,
>
> Andreas
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

RE: 1.9 to 1.11 Managed Memory Migration Questions

Posted by "Hailu, Andreas [Engineering]" <An...@gs.com>.
Hi Caizhi, thanks for responding.

The networking keys you suggested didn’t help, but I found that adding the ‘taskmanager.memory.task.off-heap.size’ with a value of ‘1g’ lead to a successful job. I can see on this property’s documentation [1] that the default value is 0 bytes.

Task Off-Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory and native memory) reserved for tasks. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.
0 bytes (default)

From the mem setup page: [2]

The off-heap memory which is allocated by user code should be accounted for in task off-heap memory.

A few points of clarity if you would:

1.     How does this relate with the overall TaskManager process memory, and is there a way to make this scale along with it for jobs that process larger batches of data?

2.     What does having a default value of 0 bytes mean?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-task-off-heap-size
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#configure-off-heap-memory-direct-or-native

// ah

From: Caizhi Weng <ts...@gmail.com>
Sent: Wednesday, August 25, 2021 10:47 PM
To: Hailu, Andreas [Engineering] <An...@ny.email.gs.com>
Cc: user@flink.apache.org
Subject: Re: 1.9 to 1.11 Managed Memory Migration Questions

Hi!

Why does this ~30% memory reduction happen?

I don't know how memory is calculated in Flink 1.9 but this 1.11 memory allocation result is reasonable. This is because managed memory, network memory and JVM overhead memory in 1.11 all has their default sizes or fractions (managed memory 40%, network memory 10% with 1g max, JVM overhead memory 10% with 1g max. See [1]), while heap memory doesn't. So a 5.8GB heap (about 12G - 2G - 40%) and 4.3G managed memory (about 40%) is explainable.

How would you suggest discerning what properties we should have a look at?

Network shuffling memory now has its own configuration key, which is taskmanager.memory.network.fraction (and ...network.min and ...network.max).Also see [1] and [2] for more keys related to task manager's memory.

[1] https://github.com/apache/flink/blob/release-1.11/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_release-2D1.11_flink-2Dcore_src_main_java_org_apache_flink_configuration_TaskManagerOptions.java&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=8cDa79cdtGYTK87Mmwovq0nm7DRSnPuGXlEGR_UjZZw&s=2QfgZUSjhslppaIXyyJQSP_SgSfUtBPQJzlFaocTh4Y&e=>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_memory_mem-5Fsetup-5Ftm.html-23detailed-2Dmemory-2Dmodel&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=8cDa79cdtGYTK87Mmwovq0nm7DRSnPuGXlEGR_UjZZw&s=He5punXHksNs-7hcAETmZ7oJV8pxAqv19UKDYIOQduk&e=>

Hailu, Andreas [Engineering] <An...@gs.com>> 于2021年8月26日周四 上午9:07写道:
Hi folks,

We’re about half way complete in migrating our YARN batch processing applications from Flink 1.9 to 1.11, and are currently tackling the memory configuration migrations.

Our test application’s sink failed with the following exception while writing to HDFS:

Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...

We submit our applications through a Flink YARN session with –ytm, -yjm etc. We don’t have any memory configurations options set aside from ‘taskmanager.network.bounded-blocking-subpartition-type: file’ which I see is now deprecated and replaced with a new option defaulted to ‘file’ (which works for us!) SO nearly everything else is as default.

We haven’t made any configuration changes yet thus far as we’re still combing through the migration instructions, but I did have some questions around what I observed.

1.     I observed that an application ran with “–ytm 12288” on 1.9 receives 8.47GB JVM Heap space and 5.95 Flink Managed Memory space  (as reported by the ApplicationMaster), where on 1.11 it receives 5.79 JVM Heap space and 4.30 Flink Managed Memory space.  Why does this ~30% memory reduction happen?

2.     Piggybacking off point 1, on 1..9 we were not explicitly setting off-heap memory parameters. How would you suggest discerning what properties we should have a look at?

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: 1.9 to 1.11 Managed Memory Migration Questions

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Why does this ~30% memory reduction happen?


I don't know how memory is calculated in Flink 1.9 but this 1.11 memory
allocation result is reasonable. This is because managed memory, network
memory and JVM overhead memory in 1.11 all has their default sizes or
fractions (managed memory 40%, network memory 10% with 1g max, JVM overhead
memory 10% with 1g max. See [1]), while heap memory doesn't. So a 5.8GB
heap (about 12G - 2G - 40%) and 4.3G managed memory (about 40%) is
explainable.

How would you suggest discerning what properties we should have a look at?


Network shuffling memory now has its own configuration key, which is
taskmanager.memory.network.fraction (and ...network.min and
...network.max).Also see [1] and [2] for more keys related to task
manager's memory.

[1]
https://github.com/apache/flink/blob/release-1.11/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model

Hailu, Andreas [Engineering] <An...@gs.com> 于2021年8月26日周四 上午9:07写道:

> Hi folks,
>
>
>
> We’re about half way complete in migrating our YARN batch processing
> applications from Flink 1.9 to 1.11, and are currently tackling the memory
> configuration migrations.
>
>
>
> Our test application’s sink failed with the following exception while
> writing to HDFS:
>
>
>
> *Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should
> not result in out-of-memory error. In certain special cases, in particular
> for jobs with high parallelism, the framework may require more direct
> memory which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory
> leak in user code or some of its dependencies which has to be investigated
> and fixed. The task executor has to be shutdown...*
>
>
>
> We submit our applications through a Flink YARN session with –ytm, -yjm
> etc. We don’t have any memory configurations options set aside from
> ‘taskmanager.network.bounded-blocking-subpartition-type: file’ which I see
> is now deprecated and replaced with a new option defaulted to ‘file’ (which
> works for us!) SO nearly everything else is as default.
>
>
>
> We haven’t made any configuration changes yet thus far as we’re still
> combing through the migration instructions, but I did have some questions
> around what I observed.
>
> 1.     I observed that an application ran with “–ytm 12288” on 1.9
> receives 8.47GB JVM Heap space and 5.95 Flink Managed Memory space  (as
> reported by the ApplicationMaster), where on 1.11 it receives 5.79 JVM Heap
> space and 4.30 Flink Managed Memory space.  Why does this ~30% memory
> reduction happen?
>
> 2.     Piggybacking off point 1, on 1..9 we were not explicitly setting
> off-heap memory parameters. How would you suggest discerning what
> properties we should have a look at?
>
>
>
> Best,
>
> Andreas
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>