You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/06/09 15:09:24 UTC

[flink] 01/02: [FLINK-18154][Runtime] Check Total Flink Memory plus JVM metaspace is less than or equal to the configured Total Process Memory

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6498f4b76f6b7b4cfe99972e241a2257575155ea
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Mon Jun 8 11:42:35 2020 +0300

    [FLINK-18154][Runtime] Check Total Flink Memory plus JVM metaspace is less than or equal to the configured Total Process Memory
    
    This closes #12520.
---
 .../util/config/memory/ProcessMemoryUtils.java     | 23 +++++++++++++---
 .../TaskExecutorProcessUtilsTest.java              | 11 ++++++++
 .../jobmanager/JobManagerProcessUtilsTest.java     |  8 ++++++
 .../config/memory/ProcessMemoryUtilsTestBase.java  | 31 ++++++++++++++++++++++
 4 files changed, 70 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
index 16a6d2d..b154156 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
@@ -143,9 +143,7 @@ public class ProcessMemoryUtils<FM extends FlinkMemory> {
 		MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
 		JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead;
 		if (config.contains(options.getTotalProcessMemoryOption())) {
-			MemorySize totalProcessMemorySize = getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
-			MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
-			sanityCheckJvmOverhead(config, jvmOverheadSize, totalProcessMemorySize);
+			MemorySize jvmOverheadSize = deriveJvmOverheadFromTotalFlinkMemoryAndOtherComponents(config, totalFlinkMemorySize);
 			jvmMetaspaceAndOverhead = new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
 		} else {
 			MemorySize jvmOverheadSize = deriveWithInverseFraction(
@@ -158,6 +156,25 @@ public class ProcessMemoryUtils<FM extends FlinkMemory> {
 		return jvmMetaspaceAndOverhead;
 	}
 
+	private MemorySize deriveJvmOverheadFromTotalFlinkMemoryAndOtherComponents(
+			Configuration config,
+			MemorySize totalFlinkMemorySize) {
+		MemorySize totalProcessMemorySize = getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
+		MemorySize jvmMetaspaceSize = getMemorySizeFromConfig(config, options.getJvmOptions().getJvmMetaspaceOption());
+		MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
+		if (totalProcessMemorySize.getBytes() < totalFlinkAndJvmMetaspaceSize.getBytes()) {
+			throw new IllegalConfigurationException(
+				"The configured Total Process Memory size (%s) is less than the sum of the derived " +
+					"Total Flink Memory size (%s) and the configured or default JVM Metaspace size  (%s).",
+				totalProcessMemorySize.toHumanReadableString(),
+				totalFlinkMemorySize.toHumanReadableString(),
+				jvmMetaspaceSize.toHumanReadableString());
+		}
+		MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
+		sanityCheckJvmOverhead(config, jvmOverheadSize, totalProcessMemorySize);
+		return jvmOverheadSize;
+	}
+
 	private void sanityCheckJvmOverhead(
 			Configuration config,
 			MemorySize derivedJvmOverheadSize,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
index 4548162..487a25b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
@@ -584,6 +584,17 @@ public class TaskExecutorProcessUtilsTest extends ProcessMemoryUtilsTestBase<Tas
 		}
 	}
 
+	@Override
+	protected void configWithFineGrainedOptions(Configuration configuration, MemorySize totalFlinkMemorySize) {
+		MemorySize componentSize = new MemorySize(totalFlinkMemorySize.getBytes() / 6);
+		configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, componentSize);
+		configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, componentSize);
+		configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, componentSize);
+		configuration.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, componentSize);
+		configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, componentSize);
+		// network is the 6th component, fixed implicitly
+	}
+
 	private static Configuration configWithExplicitTaskHeapAndManageMem() {
 		final Configuration conf = new Configuration();
 		conf.set(TaskManagerOptions.TASK_HEAP_MEMORY, TASK_HEAP_SIZE);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
index 6bab438..a7a51ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
@@ -246,6 +246,14 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa
 		}
 	}
 
+	@Override
+	protected void configWithFineGrainedOptions(Configuration configuration, MemorySize totalFlinkMemorySize) {
+		MemorySize heapSize = new MemorySize(totalFlinkMemorySize.getBytes() / 2);
+		MemorySize offHeapSize = totalFlinkMemorySize.subtract(heapSize);
+		configuration.set(JobManagerOptions.JVM_HEAP_MEMORY, heapSize);
+		configuration.set(JobManagerOptions.OFF_HEAP_MEMORY, offHeapSize);
+	}
+
 	private static Configuration configWithExplicitJvmHeap() {
 		Configuration conf = new Configuration();
 		conf.set(JobManagerOptions.JVM_HEAP_MEMORY, JVM_HEAP_SIZE);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
index 15fdd74..301e2b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
@@ -126,6 +126,35 @@ public abstract class ProcessMemoryUtilsTestBase<T extends ProcessMemorySpec> ex
 	}
 
 	@Test
+	public void testDerivedTotalProcessMemoryGreaterThanConfiguredFailureWithFineGrainedOptions() {
+		Configuration conf = getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(100, 200);
+		// Total Flink memory + JVM Metaspace > Total Process Memory (no space for JVM overhead)
+		MemorySize totalFlinkMemorySize = MemorySize.ofMebiBytes(150);
+		configWithFineGrainedOptions(conf, totalFlinkMemorySize);
+		validateFail(conf);
+	}
+
+	@Test
+	public void testDerivedTotalProcessMemoryGreaterThanConfiguredFailureWithTotalFlinkMemory() {
+		Configuration conf = getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(100, 200);
+		// Total Flink memory + JVM Metaspace > Total Process Memory (no space for JVM overhead)
+		MemorySize totalFlinkMemorySize = MemorySize.ofMebiBytes(150);
+		conf.set(options.getTotalFlinkMemoryOption(), totalFlinkMemorySize);
+		validateFail(conf);
+	}
+
+	private Configuration getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(
+			long jvmMetaspaceSizeMb,
+			long totalProcessMemorySizeMb) {
+		MemorySize jvmMetaspaceSize = MemorySize.ofMebiBytes(jvmMetaspaceSizeMb);
+		MemorySize totalProcessMemorySize = MemorySize.ofMebiBytes(totalProcessMemorySizeMb);
+		Configuration conf = new Configuration();
+		conf.set(options.getJvmOptions().getJvmMetaspaceOption(), jvmMetaspaceSize);
+		conf.set(options.getTotalProcessMemoryOption(), totalProcessMemorySize);
+		return conf;
+	}
+
+	@Test
 	public void testConfigJvmMetaspaceSize() {
 		MemorySize jvmMetaspaceSize = MemorySize.parse("50m");
 
@@ -303,6 +332,8 @@ public abstract class ProcessMemoryUtilsTestBase<T extends ProcessMemorySpec> ex
 
 	protected abstract Configuration getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(Configuration config);
 
+	protected abstract void configWithFineGrainedOptions(Configuration configuration, MemorySize totalFlinkMemorySize);
+
 	protected ConfigOption<MemorySize> getNewOptionForLegacyHeapOption() {
 		return newOptionForLegacyHeapOption;
 	}