You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/07 21:56:03 UTC
[flink] 12/21: Treat legacy TM heap size as total process memory,
not flink memory
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fd73130c0bf036bf4d04c9b2102e2b6fd1f908ce
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Nov 6 16:24:08 2019 +0100
Treat legacy TM heap size as total process memory, not flink memory
---
.../flink/configuration/TaskManagerOptions.java | 2 +-
.../TaskExecutorResourceUtils.java | 27 +++++++++++-----------
.../TaskExecutorResourceUtilsTest.java | 16 ++++++-------
3 files changed, 23 insertions(+), 22 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 7d1492c..d717ad5 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -253,6 +253,7 @@ public class TaskManagerOptions {
public static final ConfigOption<String> TOTAL_PROCESS_MEMORY =
key("taskmanager.memory.total-process.size")
.noDefaultValue()
+ .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
.withDescription("Total Process Memory size for the TaskExecutors. This includes all the memory that a"
+ " TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On"
+ " containerized setups, this should be set to the container memory.");
@@ -264,7 +265,6 @@ public class TaskManagerOptions {
public static final ConfigOption<String> TOTAL_FLINK_MEMORY =
key("taskmanager.memory.total-flink.size")
.noDefaultValue()
- .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
.withDescription("Total Flink Memory size for the TaskExecutors. This includes all the memory that a"
+ " TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory,"
+ " Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Shuffle Memory.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 83d2d7d..f2a275f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -408,8 +408,13 @@ public class TaskExecutorResourceUtils {
private static MemorySize getTotalFlinkMemorySize(final Configuration config) {
checkArgument(isTotalFlinkMemorySizeExplicitlyConfigured(config));
- if (config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)) {
- return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY));
+ return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY));
+ }
+
+ private static MemorySize getTotalProcessMemorySize(final Configuration config) {
+ checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config));
+ if (config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
+ return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
} else {
@SuppressWarnings("deprecation")
final long legacyHeapMemoryMB = config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
@@ -417,11 +422,6 @@ public class TaskExecutorResourceUtils {
}
}
- private static MemorySize getTotalProcessMemorySize(final Configuration config) {
- checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config));
- return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
- }
-
private static boolean isTaskHeapMemorySizeExplicitlyConfigured(final Configuration config) {
return config.contains(TaskManagerOptions.TASK_HEAP_MEMORY);
}
@@ -454,15 +454,16 @@ public class TaskExecutorResourceUtils {
}
private static boolean isTotalFlinkMemorySizeExplicitlyConfigured(final Configuration config) {
- // backward compatible with the deprecated config option TASK_MANAGER_HEAP_MEMORY_MB only when it's explicitly
- // configured by the user
- @SuppressWarnings("deprecation")
- final boolean legacyConfigured = config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
- return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY) || legacyConfigured;
+ return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY);
}
private static boolean isTotalProcessMemorySizeExplicitlyConfigured(final Configuration config) {
- return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
+ // backward compatible with the deprecated config options TASK_MANAGER_HEAP_MEMORY and TASK_MANAGER_HEAP_MEMORY_MB
+ // only when they are explicitly configured by the user
+ @SuppressWarnings("deprecation")
+ final boolean legacyConfigured =
+ config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) || config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+ return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY) || legacyConfigured;
}
private static void sanityCheckTotalFlinkMemory(final Configuration config, final FlinkInternalMemory flinkInternalMemory) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
index 432c1af..79ea059 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
@@ -457,17 +457,17 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
}
@Test
- public void testConfigTotalFlinkMemoryLegacyMB() {
- final MemorySize totalFlinkMemorySize = MemorySize.parse("1g");
+ public void testConfigTotalProcessMemoryLegacyMB() {
+ final MemorySize totalProcessMemorySize = MemorySize.parse("1g");
@SuppressWarnings("deprecation")
final ConfigOption<Integer> legacyOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB;
Configuration conf = new Configuration();
- conf.setInteger(legacyOption, totalFlinkMemorySize.getMebiBytes());
+ conf.setInteger(legacyOption, totalProcessMemorySize.getMebiBytes());
TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(conf);
- assertThat(taskExecutorResourceSpec.getTotalFlinkMemorySize(), is(totalFlinkMemorySize));
+ assertThat(taskExecutorResourceSpec.getTotalProcessMemorySize(), is(totalProcessMemorySize));
}
@Test
@@ -495,17 +495,17 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
}
@Test
- public void testConfigTotalFlinkMemoryLegacySize() {
- final MemorySize totalFlinkMemorySize = MemorySize.parse("1g");
+ public void testConfigTotalProcessMemoryLegacySize() {
+ final MemorySize totalProcessMemorySize = MemorySize.parse("1g");
@SuppressWarnings("deprecation")
final ConfigOption<String> legacyOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY;
Configuration conf = new Configuration();
- conf.setString(legacyOption, totalFlinkMemorySize.getMebiBytes() + "m");
+ conf.setString(legacyOption, totalProcessMemorySize.getMebiBytes() + "m");
TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(conf);
- assertThat(taskExecutorResourceSpec.getTotalFlinkMemorySize(), is(totalFlinkMemorySize));
+ assertThat(taskExecutorResourceSpec.getTotalProcessMemorySize(), is(totalProcessMemorySize));
}
@Test