You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/06/09 15:09:24 UTC
[flink] 01/02: [FLINK-18154][Runtime] Check Total Flink Memory plus
JVM metaspace is less than or equal to the configured Total Process Memory
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6498f4b76f6b7b4cfe99972e241a2257575155ea
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Mon Jun 8 11:42:35 2020 +0300
[FLINK-18154][Runtime] Check Total Flink Memory plus JVM metaspace is less than or equal to the configured Total Process Memory
This closes #12520.
---
.../util/config/memory/ProcessMemoryUtils.java | 23 +++++++++++++---
.../TaskExecutorProcessUtilsTest.java | 11 ++++++++
.../jobmanager/JobManagerProcessUtilsTest.java | 8 ++++++
.../config/memory/ProcessMemoryUtilsTestBase.java | 31 ++++++++++++++++++++++
4 files changed, 70 insertions(+), 3 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
index 16a6d2d..b154156 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
@@ -143,9 +143,7 @@ public class ProcessMemoryUtils<FM extends FlinkMemory> {
MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead;
if (config.contains(options.getTotalProcessMemoryOption())) {
- MemorySize totalProcessMemorySize = getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
- MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
- sanityCheckJvmOverhead(config, jvmOverheadSize, totalProcessMemorySize);
+ MemorySize jvmOverheadSize = deriveJvmOverheadFromTotalFlinkMemoryAndOtherComponents(config, totalFlinkMemorySize);
jvmMetaspaceAndOverhead = new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
} else {
MemorySize jvmOverheadSize = deriveWithInverseFraction(
@@ -158,6 +156,25 @@ public class ProcessMemoryUtils<FM extends FlinkMemory> {
return jvmMetaspaceAndOverhead;
}
+ private MemorySize deriveJvmOverheadFromTotalFlinkMemoryAndOtherComponents(
+ Configuration config,
+ MemorySize totalFlinkMemorySize) {
+ MemorySize totalProcessMemorySize = getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
+ MemorySize jvmMetaspaceSize = getMemorySizeFromConfig(config, options.getJvmOptions().getJvmMetaspaceOption());
+ MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
+ if (totalProcessMemorySize.getBytes() < totalFlinkAndJvmMetaspaceSize.getBytes()) {
+ throw new IllegalConfigurationException(
+ "The configured Total Process Memory size (%s) is less than the sum of the derived " +
+ "Total Flink Memory size (%s) and the configured or default JVM Metaspace size (%s).",
+ totalProcessMemorySize.toHumanReadableString(),
+ totalFlinkMemorySize.toHumanReadableString(),
+ jvmMetaspaceSize.toHumanReadableString());
+ }
+ MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
+ sanityCheckJvmOverhead(config, jvmOverheadSize, totalProcessMemorySize);
+ return jvmOverheadSize;
+ }
+
private void sanityCheckJvmOverhead(
Configuration config,
MemorySize derivedJvmOverheadSize,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
index 4548162..487a25b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
@@ -584,6 +584,17 @@ public class TaskExecutorProcessUtilsTest extends ProcessMemoryUtilsTestBase<Tas
}
}
+ @Override
+ protected void configWithFineGrainedOptions(Configuration configuration, MemorySize totalFlinkMemorySize) {
+ MemorySize componentSize = new MemorySize(totalFlinkMemorySize.getBytes() / 6);
+ configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, componentSize);
+ configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, componentSize);
+ configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, componentSize);
+ configuration.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, componentSize);
+ configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, componentSize);
+ // network is the 6th component, fixed implicitly
+ }
+
private static Configuration configWithExplicitTaskHeapAndManageMem() {
final Configuration conf = new Configuration();
conf.set(TaskManagerOptions.TASK_HEAP_MEMORY, TASK_HEAP_SIZE);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
index 6bab438..a7a51ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
@@ -246,6 +246,14 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa
}
}
+ @Override
+ protected void configWithFineGrainedOptions(Configuration configuration, MemorySize totalFlinkMemorySize) {
+ MemorySize heapSize = new MemorySize(totalFlinkMemorySize.getBytes() / 2);
+ MemorySize offHeapSize = totalFlinkMemorySize.subtract(heapSize);
+ configuration.set(JobManagerOptions.JVM_HEAP_MEMORY, heapSize);
+ configuration.set(JobManagerOptions.OFF_HEAP_MEMORY, offHeapSize);
+ }
+
private static Configuration configWithExplicitJvmHeap() {
Configuration conf = new Configuration();
conf.set(JobManagerOptions.JVM_HEAP_MEMORY, JVM_HEAP_SIZE);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
index 15fdd74..301e2b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
@@ -126,6 +126,35 @@ public abstract class ProcessMemoryUtilsTestBase<T extends ProcessMemorySpec> ex
}
@Test
+ public void testDerivedTotalProcessMemoryGreaterThanConfiguredFailureWithFineGrainedOptions() {
+ Configuration conf = getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(100, 200);
+ // Total Flink memory + JVM Metaspace > Total Process Memory (no space for JVM overhead)
+ MemorySize totalFlinkMemorySize = MemorySize.ofMebiBytes(150);
+ configWithFineGrainedOptions(conf, totalFlinkMemorySize);
+ validateFail(conf);
+ }
+
+ @Test
+ public void testDerivedTotalProcessMemoryGreaterThanConfiguredFailureWithTotalFlinkMemory() {
+ Configuration conf = getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(100, 200);
+ // Total Flink memory + JVM Metaspace > Total Process Memory (no space for JVM overhead)
+ MemorySize totalFlinkMemorySize = MemorySize.ofMebiBytes(150);
+ conf.set(options.getTotalFlinkMemoryOption(), totalFlinkMemorySize);
+ validateFail(conf);
+ }
+
+ private Configuration getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(
+ long jvmMetaspaceSizeMb,
+ long totalProcessMemorySizeMb) {
+ MemorySize jvmMetaspaceSize = MemorySize.ofMebiBytes(jvmMetaspaceSizeMb);
+ MemorySize totalProcessMemorySize = MemorySize.ofMebiBytes(totalProcessMemorySizeMb);
+ Configuration conf = new Configuration();
+ conf.set(options.getJvmOptions().getJvmMetaspaceOption(), jvmMetaspaceSize);
+ conf.set(options.getTotalProcessMemoryOption(), totalProcessMemorySize);
+ return conf;
+ }
+
+ @Test
public void testConfigJvmMetaspaceSize() {
MemorySize jvmMetaspaceSize = MemorySize.parse("50m");
@@ -303,6 +332,8 @@ public abstract class ProcessMemoryUtilsTestBase<T extends ProcessMemorySpec> ex
protected abstract Configuration getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(Configuration config);
+ protected abstract void configWithFineGrainedOptions(Configuration configuration, MemorySize totalFlinkMemorySize);
+
protected ConfigOption<MemorySize> getNewOptionForLegacyHeapOption() {
return newOptionForLegacyHeapOption;
}