You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/07 21:56:03 UTC

[flink] 12/21: Treat legacy TM heap size as total process memory, not flink memory

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd73130c0bf036bf4d04c9b2102e2b6fd1f908ce
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Nov 6 16:24:08 2019 +0100

    Treat legacy TM heap size as total process memory, not flink memory
---
 .../flink/configuration/TaskManagerOptions.java    |  2 +-
 .../TaskExecutorResourceUtils.java                 | 27 +++++++++++-----------
 .../TaskExecutorResourceUtilsTest.java             | 16 ++++++-------
 3 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 7d1492c..d717ad5 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -253,6 +253,7 @@ public class TaskManagerOptions {
 	public static final ConfigOption<String> TOTAL_PROCESS_MEMORY =
 		key("taskmanager.memory.total-process.size")
 			.noDefaultValue()
+			.withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
 			.withDescription("Total Process Memory size for the TaskExecutors. This includes all the memory that a"
 				+ " TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On"
 				+ " containerized setups, this should be set to the container memory.");
@@ -264,7 +265,6 @@ public class TaskManagerOptions {
 	public static final ConfigOption<String> TOTAL_FLINK_MEMORY =
 		key("taskmanager.memory.total-flink.size")
 		.noDefaultValue()
-		.withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
 		.withDescription("Total Flink Memory size for the TaskExecutors. This includes all the memory that a"
 			+ " TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory,"
 			+ " Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Shuffle Memory.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 83d2d7d..f2a275f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -408,8 +408,13 @@ public class TaskExecutorResourceUtils {
 
 	private static MemorySize getTotalFlinkMemorySize(final Configuration config) {
 		checkArgument(isTotalFlinkMemorySizeExplicitlyConfigured(config));
-		if (config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)) {
-			return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY));
+		return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY));
+	}
+
+	private static MemorySize getTotalProcessMemorySize(final Configuration config) {
+		checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config));
+		if (config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
+			return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
 		} else {
 			@SuppressWarnings("deprecation")
 			final long legacyHeapMemoryMB = config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
@@ -417,11 +422,6 @@ public class TaskExecutorResourceUtils {
 		}
 	}
 
-	private static MemorySize getTotalProcessMemorySize(final Configuration config) {
-		checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config));
-		return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
-	}
-
 	private static boolean isTaskHeapMemorySizeExplicitlyConfigured(final Configuration config) {
 		return config.contains(TaskManagerOptions.TASK_HEAP_MEMORY);
 	}
@@ -454,15 +454,16 @@ public class TaskExecutorResourceUtils {
 	}
 
 	private static boolean isTotalFlinkMemorySizeExplicitlyConfigured(final Configuration config) {
-		// backward compatible with the deprecated config option TASK_MANAGER_HEAP_MEMORY_MB only when it's explicitly
-		// configured by the user
-		@SuppressWarnings("deprecation")
-		final boolean legacyConfigured = config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
-		return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY) || legacyConfigured;
+		return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY);
 	}
 
 	private static boolean isTotalProcessMemorySizeExplicitlyConfigured(final Configuration config) {
-		return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
+		// backward compatible with the deprecated config options TASK_MANAGER_HEAP_MEMORY and TASK_MANAGER_HEAP_MEMORY_MB
+		// only when they are explicitly configured by the user
+		@SuppressWarnings("deprecation")
+		final boolean legacyConfigured =
+			config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) || config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+		return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY) || legacyConfigured;
 	}
 
 	private static void sanityCheckTotalFlinkMemory(final Configuration config, final FlinkInternalMemory flinkInternalMemory) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
index 432c1af..79ea059 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
@@ -457,17 +457,17 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
 	}
 
 	@Test
-	public void testConfigTotalFlinkMemoryLegacyMB() {
-		final MemorySize totalFlinkMemorySize = MemorySize.parse("1g");
+	public void testConfigTotalProcessMemoryLegacyMB() {
+		final MemorySize totalProcessMemorySize = MemorySize.parse("1g");
 
 		@SuppressWarnings("deprecation")
 		final ConfigOption<Integer> legacyOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB;
 
 		Configuration conf = new Configuration();
-		conf.setInteger(legacyOption, totalFlinkMemorySize.getMebiBytes());
+		conf.setInteger(legacyOption, totalProcessMemorySize.getMebiBytes());
 
 		TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(conf);
-		assertThat(taskExecutorResourceSpec.getTotalFlinkMemorySize(), is(totalFlinkMemorySize));
+		assertThat(taskExecutorResourceSpec.getTotalProcessMemorySize(), is(totalProcessMemorySize));
 	}
 
 	@Test
@@ -495,17 +495,17 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
 	}
 
 	@Test
-	public void testConfigTotalFlinkMemoryLegacySize() {
-		final MemorySize totalFlinkMemorySize = MemorySize.parse("1g");
+	public void testConfigTotalProcessMemoryLegacySize() {
+		final MemorySize totalProcessMemorySize = MemorySize.parse("1g");
 
 		@SuppressWarnings("deprecation")
 		final ConfigOption<String> legacyOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY;
 
 		Configuration conf = new Configuration();
-		conf.setString(legacyOption, totalFlinkMemorySize.getMebiBytes() + "m");
+		conf.setString(legacyOption, totalProcessMemorySize.getMebiBytes() + "m");
 
 		TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(conf);
-		assertThat(taskExecutorResourceSpec.getTotalFlinkMemorySize(), is(totalFlinkMemorySize));
+		assertThat(taskExecutorResourceSpec.getTotalProcessMemorySize(), is(totalProcessMemorySize));
 	}
 
 	@Test