You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/06/11 07:36:07 UTC

[flink] 02/02: [FLINK-18214][Runtime] Remove Job Cache size check against JVM Heap size

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3ca7fe0b69b906ab54d3cdbe79643421536321e0
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Jun 10 18:14:31 2020 +0300

    [FLINK-18214][Runtime] Remove Job Cache size check against JVM Heap size
    
    Checking that the job cache size is less than JVM heap for JM, may be inconclusive and confusing for users. The job cache size option does not strictly limit the real size and stays an advanced emergency mean. The job size calculation is approximate and the real cache size can be larger than its configured limit (`jobstore.cache-size`).
    
    Therefore, this PR removes the related code from `JobManagerFlinkMemoryUtils`, tests and memory tuning guide.
    
    This closes #12590.
---
 docs/ops/memory/mem_setup_master.md                 |  5 +----
 docs/ops/memory/mem_setup_master.zh.md              |  5 +----
 .../jobmanager/JobManagerFlinkMemoryUtils.java      | 20 ++------------------
 .../jobmanager/JobManagerProcessUtilsTest.java      | 21 ---------------------
 4 files changed, 4 insertions(+), 47 deletions(-)

diff --git a/docs/ops/memory/mem_setup_master.md b/docs/ops/memory/mem_setup_master.md
index d086427..70e259abb 100644
--- a/docs/ops/memory/mem_setup_master.md
+++ b/docs/ops/memory/mem_setup_master.md
@@ -62,15 +62,12 @@ As mentioned before in the [total memory description](mem_setup.html#configure-t
 for the Master is to specify explicitly the *JVM Heap* size ([`jobmanager.memory.heap.size`](../config.html#jobmanager-memory-heap-size)).
 It gives more control over the available *JVM Heap* which is used by:
 
-* Flink framework (e.g. *Job cache*)
+* Flink framework
 * User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks
 
 The required size of *JVM Heap* is mostly driven by the number of running jobs, their structure, and requirements for
 the mentioned user code.
 
-The *Job cache* resides in the *JVM Heap*. It can be configured by
-[`jobstore.cache-size`](../config.html#jobstore-cache-size) which must be less than the configured or derived *JVM Heap* size.
-
 <span class="label label-info">Note</span> If you have configured the *JVM Heap* explicitly, it is recommended to set
 neither *total process memory* nor *total Flink memory*. Otherwise, it may easily lead to memory configuration conflicts.
 The Flink scripts and CLI set the *JVM Heap* size via the JVM parameters *-Xms* and *-Xmx* when they start the Master process, see also [JVM parameters](mem_setup.html#jvm-parameters).
diff --git a/docs/ops/memory/mem_setup_master.zh.md b/docs/ops/memory/mem_setup_master.zh.md
index 54b5165..2887577 100644
--- a/docs/ops/memory/mem_setup_master.zh.md
+++ b/docs/ops/memory/mem_setup_master.zh.md
@@ -62,15 +62,12 @@ As mentioned before in the [total memory description](mem_setup.html#configure-t
 for the Master is to specify explicitly the *JVM Heap* size ([`jobmanager.memory.heap.size`](../config.html#jobmanager-memory-heap-size)).
 It gives more control over the available *JVM Heap* which is used by:
 
-* Flink framework (e.g. *Job cache*)
+* Flink framework
 * User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks
 
 The required size of *JVM Heap* is mostly driven by the number of running jobs, their structure, and requirements for
 the mentioned user code.
 
-The *Job cache* resides in the *JVM Heap*. It can be configured by
-[`jobstore.cache-size`](../config.html#jobstore-cache-size) which must be less than the configured or derived *JVM Heap* size.
-
 <span class="label label-info">Note</span> If you have configured the *JVM Heap* explicitly, it is recommended to set
 neither *total process memory* nor *total Flink memory*. Otherwise, it may easily lead to memory configuration conflicts.
 The Flink scripts and CLI set the *JVM Heap* size via the JVM parameters *-Xms* and *-Xmx* when they start the Master process, see also [JVM parameters](mem_setup.html#jvm-parameters).
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
index 9f4f13e..58e5b4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
@@ -53,7 +53,7 @@ public class JobManagerFlinkMemoryUtils implements FlinkMemoryUtils<JobManagerFl
 			}
 		}
 
-		return createJobManagerFlinkMemory(config, jvmHeapMemorySize, offHeapMemorySize);
+		return createJobManagerFlinkMemory(jvmHeapMemorySize, offHeapMemorySize);
 	}
 
 	private static void sanityCheckTotalFlinkMemory(
@@ -107,15 +107,13 @@ public class JobManagerFlinkMemoryUtils implements FlinkMemoryUtils<JobManagerFl
 				offHeapMemorySize.toHumanReadableString());
 		}
 		MemorySize derivedJvmHeapMemorySize = totalFlinkMemorySize.subtract(offHeapMemorySize);
-		return createJobManagerFlinkMemory(config, derivedJvmHeapMemorySize, offHeapMemorySize);
+		return createJobManagerFlinkMemory(derivedJvmHeapMemorySize, offHeapMemorySize);
 	}
 
 	private static JobManagerFlinkMemory createJobManagerFlinkMemory(
-			Configuration config,
 			MemorySize jvmHeap,
 			MemorySize offHeapMemory) {
 		verifyJvmHeapSize(jvmHeap);
-		verifyJobStoreCacheSize(config, jvmHeap);
 		return new JobManagerFlinkMemory(jvmHeap, offHeapMemory);
 	}
 
@@ -127,18 +125,4 @@ public class JobManagerFlinkMemoryUtils implements FlinkMemoryUtils<JobManagerFl
 				JobManagerOptions.MIN_JVM_HEAP_SIZE.toHumanReadableString());
 		}
 	}
-
-	private static void verifyJobStoreCacheSize(Configuration config, MemorySize jvmHeapSize) {
-		MemorySize jobStoreCacheHeapSize =
-			MemorySize.parse(config.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE) + "b");
-		if (jvmHeapSize.compareTo(jobStoreCacheHeapSize) < 0) {
-			LOG.warn(
-				"The configured or derived JVM heap memory size ({}: {}) is less than the configured or default size " +
-					"of the job store cache ({}: {})",
-				JobManagerOptions.JVM_HEAP_MEMORY.key(),
-				jvmHeapSize.toHumanReadableString(),
-				JobManagerOptions.JOB_STORE_CACHE_SIZE.key(),
-				jobStoreCacheHeapSize.toHumanReadableString());
-		}
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
index d6c202e..5cf1cca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
@@ -84,27 +84,6 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa
 	}
 
 	@Test
-	public void testLogFailureOfJobStoreCacheSizeVerification() {
-		MemorySize jvmHeapMemory = MemorySize.parse("150m");
-		MemorySize jobStoreCacheSize = MemorySize.parse("200m");
-
-		Configuration conf = new Configuration();
-		conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapMemory);
-		conf.set(JobManagerOptions.JOB_STORE_CACHE_SIZE, jobStoreCacheSize.getBytes());
-
-		JobManagerProcessUtils.processSpecFromConfig(conf);
-		MatcherAssert.assertThat(
-			testLoggerResource.getMessages(),
-			hasItem(containsString(String.format(
-				"The configured or derived JVM heap memory size (%s: %s) is less than the configured or default size " +
-					"of the job store cache (%s: %s)",
-				JobManagerOptions.JVM_HEAP_MEMORY.key(),
-				jvmHeapMemory.toHumanReadableString(),
-				JobManagerOptions.JOB_STORE_CACHE_SIZE.key(),
-				jobStoreCacheSize.toHumanReadableString()))));
-	}
-
-	@Test
 	public void testConfigOffHeapMemory() {
 		MemorySize offHeapMemory = MemorySize.parse("100m");