You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/21 16:49:36 UTC

[flink] branch master updated: [FLINK-18993][Runtime]Invoke sanityCheckTotalFlinkMemory method incorrectly

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new df525b7  [FLINK-18993][Runtime]Invoke sanityCheckTotalFlinkMemory method incorrectly
df525b7 is described below

commit df525b77d29ccd89649a64e5faad96c93f61ca08
Author: Peng <wo...@foxmail.com>
AuthorDate: Wed Aug 19 14:44:53 2020 +0800

    [FLINK-18993][Runtime]Invoke sanityCheckTotalFlinkMemory method incorrectly
    
    [FLINK-18993][Runtime]Invoke sanityCheckTotalFlinkMemory method incorrectly
    
    This closes #13198.
---
 .../memory/jobmanager/JobManagerFlinkMemoryUtils.java    |  2 +-
 .../runtime/jobmanager/JobManagerProcessUtilsTest.java   | 16 ++++++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
index 58e5b4a..7b556a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
@@ -46,7 +46,7 @@ public class JobManagerFlinkMemoryUtils implements FlinkMemoryUtils<JobManagerFl
 			MemorySize totalFlinkMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.TOTAL_FLINK_MEMORY);
 			if (config.contains(JobManagerOptions.OFF_HEAP_MEMORY)) {
 				// off-heap memory is explicitly set by user
-				sanityCheckTotalFlinkMemory(totalFlinkMemorySize, jvmHeapMemorySize, totalFlinkMemorySize);
+				sanityCheckTotalFlinkMemory(totalFlinkMemorySize, jvmHeapMemorySize, offHeapMemorySize);
 			} else {
 				// off-heap memory is not explicitly set by user, derive it from Total Flink Memory and JVM Heap
 				offHeapMemorySize = deriveOffHeapMemory(jvmHeapMemorySize, totalFlinkMemorySize, offHeapMemorySize);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
index 93990b9..b79cf22 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
@@ -145,6 +145,22 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa
 				defaultOffHeap.toHumanReadableString()))));
 	}
 
+	@Test
+	public void testDeriveFromRequiredFineGrainedOptions() {
+		MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
+		MemorySize offHeap = MemorySize.ofMebiBytes(50);
+		MemorySize totalFlinkMemory = MemorySize.ofMebiBytes(200);
+		MemorySize expectedOffHeap = MemorySize.ofMebiBytes(50);
+
+		Configuration conf = new Configuration();
+		conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, totalFlinkMemory);
+		conf.set(JobManagerOptions.OFF_HEAP_MEMORY, offHeap);
+		conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeap);
+
+		JobManagerProcessSpec jobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfig(conf);
+		assertThat(jobManagerProcessSpec.getJvmDirectMemorySize(), is(expectedOffHeap));
+	}
+
 	@Override
 	protected JobManagerProcessSpec processSpecFromConfig(Configuration config) {
 		return JobManagerProcessUtils.processSpecFromConfig(config);