You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/06/10 15:28:03 UTC
[flink] branch master updated: [FLINK-18188][Runtime] Derive JM
Off-Heap memory from configured Total Flink Memory minus JVM Heap
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4555ad9 [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap
4555ad9 is described below
commit 4555ad91b4bd0df8887c4b4eb3119cbae272e805
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Tue Jun 9 16:47:04 2020 +0300
[FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap
---
docs/ops/memory/mem_setup_master.md | 5 ++
docs/ops/memory/mem_setup_master.zh.md | 4 ++
.../jobmanager/JobManagerFlinkMemoryUtils.java | 56 ++++++++++++++++++----
.../jobmanager/JobManagerProcessUtilsTest.java | 38 ++++++++++++++-
4 files changed, 93 insertions(+), 10 deletions(-)
diff --git a/docs/ops/memory/mem_setup_master.md b/docs/ops/memory/mem_setup_master.md
index dbea142..d086427 100644
--- a/docs/ops/memory/mem_setup_master.md
+++ b/docs/ops/memory/mem_setup_master.md
@@ -89,6 +89,11 @@ There can be the following possible sources of *Off-heap* memory consumption:
* Flink framework dependencies (e.g. Akka network communication)
* User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks
+<span class="label label-info">Note</span> If you have configured the [Total Flink Memory](mem_setup.html#configure-total-memory)
+and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured the *Off-heap* memory, the size of the *Off-heap* memory
+will be derived as the [Total Flink Memory](mem_setup.html#configure-total-memory) minus the [JVM Heap](#configure-jvm-heap).
+The default value of the *Off-heap* memory option will be ignored.
+
## Local Execution
If you run Flink locally (e.g. from your IDE) without creating a cluster, then the Master memory configuration options are ignored.
diff --git a/docs/ops/memory/mem_setup_master.zh.md b/docs/ops/memory/mem_setup_master.zh.md
index dbea142..54b5165 100644
--- a/docs/ops/memory/mem_setup_master.zh.md
+++ b/docs/ops/memory/mem_setup_master.zh.md
@@ -89,6 +89,10 @@ There can be the following possible sources of *Off-heap* memory consumption:
* Flink framework dependencies (e.g. Akka network communication)
* User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks
+<span class="label label-info">Note</span> If you have configured the [Total Flink Memory](mem_setup.html#configure-total-memory)
+and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured the *Off-heap* memory, the size of the *Off-heap* memory
+will be derived as the [Total Flink Memory](mem_setup.html#configure-total-memory) minus the [JVM Heap](#configure-jvm-heap).
+The default value of the *Off-heap* memory option will be ignored.
## Local Execution
If you run Flink locally (e.g. from your IDE) without creating a cluster, then the Master memory configuration options are ignored.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
index b22b6e6..9f4f13e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
@@ -40,25 +40,63 @@ public class JobManagerFlinkMemoryUtils implements FlinkMemoryUtils<JobManagerFl
public JobManagerFlinkMemory deriveFromRequiredFineGrainedOptions(Configuration config) {
MemorySize jvmHeapMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.JVM_HEAP_MEMORY);
MemorySize offHeapMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.OFF_HEAP_MEMORY);
- MemorySize derivedTotalFlinkMemorySize = jvmHeapMemorySize.add(offHeapMemorySize);
if (config.contains(JobManagerOptions.TOTAL_FLINK_MEMORY)) {
// derive network memory from total flink memory, and check against network min/max
MemorySize totalFlinkMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.TOTAL_FLINK_MEMORY);
- if (derivedTotalFlinkMemorySize.getBytes() != totalFlinkMemorySize.getBytes()) {
- throw new IllegalConfigurationException(String.format(
- "Sum of the configured JVM Heap Memory (%s) and the configured or default Off-heap Memory (%s) " +
- "exceeds the configured Total Flink Memory (%s). Please, make the configuration consistent " +
- "or configure only one option: either JVM Heap or Total Flink Memory.",
- jvmHeapMemorySize.toHumanReadableString(),
- offHeapMemorySize.toHumanReadableString(),
- totalFlinkMemorySize.toHumanReadableString()));
+ if (config.contains(JobManagerOptions.OFF_HEAP_MEMORY)) {
+ // off-heap memory is explicitly set by user
+ sanityCheckTotalFlinkMemory(totalFlinkMemorySize, jvmHeapMemorySize, totalFlinkMemorySize);
+ } else {
+ // off-heap memory is not explicitly set by user, derive it from Total Flink Memory and JVM Heap
+ offHeapMemorySize = deriveOffHeapMemory(jvmHeapMemorySize, totalFlinkMemorySize, offHeapMemorySize);
}
}
return createJobManagerFlinkMemory(config, jvmHeapMemorySize, offHeapMemorySize);
}
+ private static void sanityCheckTotalFlinkMemory(
+ MemorySize totalFlinkMemorySize,
+ MemorySize jvmHeapMemorySize,
+ MemorySize offHeapMemorySize) {
+ MemorySize derivedTotalFlinkMemorySize = jvmHeapMemorySize.add(offHeapMemorySize);
+ if (derivedTotalFlinkMemorySize.getBytes() != totalFlinkMemorySize.getBytes()) {
+ throw new IllegalConfigurationException(String.format(
+ "Sum of the configured JVM Heap Memory (%s) and the configured Off-heap Memory (%s) " +
+ "does not match the configured Total Flink Memory (%s). Please, make the configuration consistent " +
+ "or configure only one option: either JVM Heap or Total Flink Memory.",
+ jvmHeapMemorySize.toHumanReadableString(),
+ offHeapMemorySize.toHumanReadableString(),
+ totalFlinkMemorySize.toHumanReadableString()));
+ }
+ }
+
+ private static MemorySize deriveOffHeapMemory(
+ MemorySize jvmHeapMemorySize,
+ MemorySize totalFlinkMemorySize,
+ MemorySize defaultOffHeapMemorySize) {
+ if (totalFlinkMemorySize.getBytes() < jvmHeapMemorySize.getBytes()) {
+ throw new IllegalConfigurationException(String.format(
+ "The configured JVM Heap Memory (%s) exceeds the configured Total Flink Memory (%s). " +
+ "Please, make the configuration consistent or configure only one option: either JVM Heap " +
+ "or Total Flink Memory.",
+ jvmHeapMemorySize.toHumanReadableString(),
+ totalFlinkMemorySize.toHumanReadableString()));
+ }
+ MemorySize offHeapMemorySize = totalFlinkMemorySize.subtract(jvmHeapMemorySize);
+ if (offHeapMemorySize.getBytes() != defaultOffHeapMemorySize.getBytes()) {
+ LOG.info(
+ "The Off-Heap Memory size ({}) is derived the configured Total Flink Memory size ({}) minus " +
+ "the configured JVM Heap Memory size ({}). The default Off-Heap Memory size ({}) is ignored.",
+ offHeapMemorySize.toHumanReadableString(),
+ totalFlinkMemorySize.toHumanReadableString(),
+ jvmHeapMemorySize.toHumanReadableString(),
+ defaultOffHeapMemorySize.toHumanReadableString());
+ }
+ return offHeapMemorySize;
+ }
+
@Override
public JobManagerFlinkMemory deriveFromTotalFlinkMemory(Configuration config, MemorySize totalFlinkMemorySize) {
MemorySize offHeapMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.OFF_HEAP_MEMORY);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
index a7a51ec..840520f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
@@ -50,7 +50,7 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa
private static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1536m");
@Rule
- public final TestLoggerResource testLoggerResource = new TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.WARN);
+ public final TestLoggerResource testLoggerResource = new TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.INFO);
public JobManagerProcessUtilsTest() {
super(JM_PROCESS_MEMORY_OPTIONS, JM_LEGACY_HEAP_OPTIONS, JobManagerOptions.TOTAL_PROCESS_MEMORY);
@@ -130,6 +130,42 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa
validateFail(conf);
}
+ @Test
+ public void testJvmHeapExceedsTotalFlinkMemoryFailure() {
+ MemorySize totalFlinkMemory = MemorySize.ofMebiBytes(100);
+ MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
+
+ Configuration conf = new Configuration();
+ conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, totalFlinkMemory);
+ conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeap);
+
+ validateFail(conf);
+ }
+
+ @Test
+ public void testOffHeapMemoryDerivedFromJvmHeapAndTotalFlinkMemory() {
+ MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
+ MemorySize defaultOffHeap = JobManagerOptions.OFF_HEAP_MEMORY.defaultValue();
+ MemorySize expectedOffHeap = MemorySize.ofMebiBytes(100).add(defaultOffHeap);
+ MemorySize totalFlinkMemory = jvmHeap.add(expectedOffHeap);
+
+ Configuration conf = new Configuration();
+ conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, totalFlinkMemory);
+ conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeap);
+
+ JobManagerProcessSpec JobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfig(conf);
+ assertThat(JobManagerProcessSpec.getJvmDirectMemorySize(), is(expectedOffHeap));
+ MatcherAssert.assertThat(
+ testLoggerResource.getMessages(),
+ hasItem(containsString(String.format(
+ "The Off-Heap Memory size (%s) is derived the configured Total Flink Memory size (%s) minus " +
+ "the configured JVM Heap Memory size (%s). The default Off-Heap Memory size (%s) is ignored.",
+ expectedOffHeap.toHumanReadableString(),
+ totalFlinkMemory.toHumanReadableString(),
+ jvmHeap.toHumanReadableString(),
+ defaultOffHeap.toHumanReadableString()))));
+ }
+
@Override
protected JobManagerProcessSpec processSpecFromConfig(Configuration config) {
return JobManagerProcessUtils.processSpecFromConfig(config);