You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/07 21:56:04 UTC

[flink] 13/21: Add backwards compatibility

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b5be514ad75d0343792b95ea5be8f83b0fa25d8
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Oct 31 17:06:58 2019 +0100

    Add backwards compatibility
---
 flink-dist/src/main/resources/flink-conf.yaml                      | 4 ++--
 .../flink/runtime/clusterframework/TaskExecutorResourceUtils.java  | 7 +++++--
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index a3bc57d..15d7aa9 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -42,9 +42,9 @@ jobmanager.rpc.port: 6123
 jobmanager.heap.size: 1024m
 
 
-# The heap size for the TaskManager JVM
+# Total Flink process size for the TaskExecutor
 
-taskmanager.heap.size: 1024m
+taskmanager.memory.total-process.size: 1024m
 
 
 # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index f2a275f..4b649e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -417,7 +417,9 @@ public class TaskExecutorResourceUtils {
 			return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
 		} else {
 			@SuppressWarnings("deprecation")
-			final long legacyHeapMemoryMB = config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
+			final long legacyHeapMemoryMB = config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) ?
+				config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) :
+				MemorySize.parse(config.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getBytes();
 			return new MemorySize(legacyHeapMemoryMB << 20); // megabytes to bytes
 		}
 	}
@@ -427,7 +429,8 @@ public class TaskExecutorResourceUtils {
 	}
 
 	private static boolean isManagedMemorySizeExplicitlyConfigured(final Configuration config) {
-		return config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+		return config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE) ||
+			config.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE);
 	}
 
 	private static boolean isManagedMemoryOffHeapFractionExplicitlyConfigured(final Configuration config) {