You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/06/10 15:28:03 UTC

[flink] branch master updated: [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4555ad9  [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap
4555ad9 is described below

commit 4555ad91b4bd0df8887c4b4eb3119cbae272e805
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Tue Jun 9 16:47:04 2020 +0300

    [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap
---
 docs/ops/memory/mem_setup_master.md                |  5 ++
 docs/ops/memory/mem_setup_master.zh.md             |  4 ++
 .../jobmanager/JobManagerFlinkMemoryUtils.java     | 56 ++++++++++++++++++----
 .../jobmanager/JobManagerProcessUtilsTest.java     | 38 ++++++++++++++-
 4 files changed, 93 insertions(+), 10 deletions(-)

diff --git a/docs/ops/memory/mem_setup_master.md b/docs/ops/memory/mem_setup_master.md
index dbea142..d086427 100644
--- a/docs/ops/memory/mem_setup_master.md
+++ b/docs/ops/memory/mem_setup_master.md
@@ -89,6 +89,11 @@ There can be the following possible sources of *Off-heap* memory consumption:
 * Flink framework dependencies (e.g. Akka network communication)
 * User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks
 
+<span class="label label-info">Note</span> If you have configured the [Total Flink Memory](mem_setup.html#configure-total-memory)
+and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured the *Off-heap* memory, the size of the *Off-heap* memory
+will be derived as the [Total Flink Memory](mem_setup.html#configure-total-memory) minus the [JVM Heap](#configure-jvm-heap).
+The default value of the *Off-heap* memory option will be ignored.
+
 ## Local Execution
 
 If you run Flink locally (e.g. from your IDE) without creating a cluster, then the Master memory configuration options are ignored.
diff --git a/docs/ops/memory/mem_setup_master.zh.md b/docs/ops/memory/mem_setup_master.zh.md
index dbea142..54b5165 100644
--- a/docs/ops/memory/mem_setup_master.zh.md
+++ b/docs/ops/memory/mem_setup_master.zh.md
@@ -89,6 +89,10 @@ There can be the following possible sources of *Off-heap* memory consumption:
 * Flink framework dependencies (e.g. Akka network communication)
 * User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks
 
+<span class="label label-info">Note</span> If you have configured the [Total Flink Memory](mem_setup.html#configure-total-memory)
+and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured the *Off-heap* memory, the size of the *Off-heap* memory
+will be derived as the [Total Flink Memory](mem_setup.html#configure-total-memory) minus the [JVM Heap](#configure-jvm-heap).
+The default value of the *Off-heap* memory option will be ignored.
 ## Local Execution
 
 If you run Flink locally (e.g. from your IDE) without creating a cluster, then the Master memory configuration options are ignored.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
index b22b6e6..9f4f13e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
@@ -40,25 +40,63 @@ public class JobManagerFlinkMemoryUtils implements FlinkMemoryUtils<JobManagerFl
 	public JobManagerFlinkMemory deriveFromRequiredFineGrainedOptions(Configuration config) {
 		MemorySize jvmHeapMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.JVM_HEAP_MEMORY);
 		MemorySize offHeapMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.OFF_HEAP_MEMORY);
-		MemorySize derivedTotalFlinkMemorySize = jvmHeapMemorySize.add(offHeapMemorySize);
 
 		if (config.contains(JobManagerOptions.TOTAL_FLINK_MEMORY)) {
 			// derive network memory from total flink memory, and check against network min/max
 			MemorySize totalFlinkMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.TOTAL_FLINK_MEMORY);
-			if (derivedTotalFlinkMemorySize.getBytes() != totalFlinkMemorySize.getBytes()) {
-				throw new IllegalConfigurationException(String.format(
-					"Sum of the configured JVM Heap Memory (%s) and the configured or default Off-heap Memory (%s) " +
-						"exceeds the configured Total Flink Memory (%s). Please, make the configuration consistent " +
-						"or configure only one option: either JVM Heap or Total Flink Memory.",
-					jvmHeapMemorySize.toHumanReadableString(),
-					offHeapMemorySize.toHumanReadableString(),
-					totalFlinkMemorySize.toHumanReadableString()));
+			if (config.contains(JobManagerOptions.OFF_HEAP_MEMORY)) {
+				// off-heap memory is explicitly set by user
+				sanityCheckTotalFlinkMemory(totalFlinkMemorySize, jvmHeapMemorySize, totalFlinkMemorySize);
+			} else {
+				// off-heap memory is not explicitly set by user, derive it from Total Flink Memory and JVM Heap
+				offHeapMemorySize = deriveOffHeapMemory(jvmHeapMemorySize, totalFlinkMemorySize, offHeapMemorySize);
 			}
 		}
 
 		return createJobManagerFlinkMemory(config, jvmHeapMemorySize, offHeapMemorySize);
 	}
 
+	private static void sanityCheckTotalFlinkMemory(
+			MemorySize totalFlinkMemorySize,
+			MemorySize jvmHeapMemorySize,
+			MemorySize offHeapMemorySize) {
+		MemorySize derivedTotalFlinkMemorySize = jvmHeapMemorySize.add(offHeapMemorySize);
+		if (derivedTotalFlinkMemorySize.getBytes() != totalFlinkMemorySize.getBytes()) {
+			throw new IllegalConfigurationException(String.format(
+				"Sum of the configured JVM Heap Memory (%s) and the configured Off-heap Memory (%s) " +
+					"does not match the configured Total Flink Memory (%s). Please, make the configuration consistent " +
+					"or configure only one option: either JVM Heap or Total Flink Memory.",
+				jvmHeapMemorySize.toHumanReadableString(),
+				offHeapMemorySize.toHumanReadableString(),
+				totalFlinkMemorySize.toHumanReadableString()));
+		}
+	}
+
+	private static MemorySize deriveOffHeapMemory(
+			MemorySize jvmHeapMemorySize,
+			MemorySize totalFlinkMemorySize,
+			MemorySize defaultOffHeapMemorySize) {
+		if (totalFlinkMemorySize.getBytes() < jvmHeapMemorySize.getBytes()) {
+			throw new IllegalConfigurationException(String.format(
+				"The configured JVM Heap Memory (%s) exceeds the configured Total Flink Memory (%s). " +
+					"Please, make the configuration consistent or configure only one option: either JVM Heap " +
+					"or Total Flink Memory.",
+				jvmHeapMemorySize.toHumanReadableString(),
+				totalFlinkMemorySize.toHumanReadableString()));
+		}
+		MemorySize offHeapMemorySize = totalFlinkMemorySize.subtract(jvmHeapMemorySize);
+		if (offHeapMemorySize.getBytes() != defaultOffHeapMemorySize.getBytes()) {
+			LOG.info(
+				"The Off-Heap Memory size ({}) is derived the configured Total Flink Memory size ({}) minus " +
+					"the configured JVM Heap Memory size ({}). The default Off-Heap Memory size ({}) is ignored.",
+				offHeapMemorySize.toHumanReadableString(),
+				totalFlinkMemorySize.toHumanReadableString(),
+				jvmHeapMemorySize.toHumanReadableString(),
+				defaultOffHeapMemorySize.toHumanReadableString());
+		}
+		return offHeapMemorySize;
+	}
+
 	@Override
 	public JobManagerFlinkMemory deriveFromTotalFlinkMemory(Configuration config, MemorySize totalFlinkMemorySize) {
 		MemorySize offHeapMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.OFF_HEAP_MEMORY);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
index a7a51ec..840520f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
@@ -50,7 +50,7 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa
 	private static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1536m");
 
 	@Rule
-	public final TestLoggerResource testLoggerResource = new TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.WARN);
+	public final TestLoggerResource testLoggerResource = new TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.INFO);
 
 	public JobManagerProcessUtilsTest() {
 		super(JM_PROCESS_MEMORY_OPTIONS, JM_LEGACY_HEAP_OPTIONS, JobManagerOptions.TOTAL_PROCESS_MEMORY);
@@ -130,6 +130,42 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa
 		validateFail(conf);
 	}
 
+	@Test
+	public void testJvmHeapExceedsTotalFlinkMemoryFailure() {
+		MemorySize totalFlinkMemory = MemorySize.ofMebiBytes(100);
+		MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
+
+		Configuration conf = new Configuration();
+		conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, totalFlinkMemory);
+		conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeap);
+
+		validateFail(conf);
+	}
+
+	@Test
+	public void testOffHeapMemoryDerivedFromJvmHeapAndTotalFlinkMemory() {
+		MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
+		MemorySize defaultOffHeap = JobManagerOptions.OFF_HEAP_MEMORY.defaultValue();
+		MemorySize expectedOffHeap = MemorySize.ofMebiBytes(100).add(defaultOffHeap);
+		MemorySize totalFlinkMemory = jvmHeap.add(expectedOffHeap);
+
+		Configuration conf = new Configuration();
+		conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, totalFlinkMemory);
+		conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeap);
+
+		JobManagerProcessSpec JobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfig(conf);
+		assertThat(JobManagerProcessSpec.getJvmDirectMemorySize(), is(expectedOffHeap));
+		MatcherAssert.assertThat(
+			testLoggerResource.getMessages(),
+			hasItem(containsString(String.format(
+				"The Off-Heap Memory size (%s) is derived the configured Total Flink Memory size (%s) minus " +
+					"the configured JVM Heap Memory size (%s). The default Off-Heap Memory size (%s) is ignored.",
+				expectedOffHeap.toHumanReadableString(),
+				totalFlinkMemory.toHumanReadableString(),
+				jvmHeap.toHumanReadableString(),
+				defaultOffHeap.toHumanReadableString()))));
+	}
+
 	@Override
 	protected JobManagerProcessSpec processSpecFromConfig(Configuration config) {
 		return JobManagerProcessUtils.processSpecFromConfig(config);