You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/08 13:34:09 UTC

[flink] 06/10: [FLINK-12812] [runtime] (follow-up) Deduplicate managed memory configuration logic between ResourceManager and TaskManagerServices

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 494e12d57d4cc780749bfabe9a6bc7fec9c6887d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 18:41:18 2019 +0200

    [FLINK-12812] [runtime] (follow-up) Deduplicate managed memory configuration logic between ResourceManager and TaskManagerServices
---
 .../runtime/resourcemanager/ResourceManager.java   | 26 ++++------------------
 .../runtime/taskexecutor/TaskManagerServices.java  |  1 -
 .../apache/flink/yarn/YarnResourceManagerTest.java |  7 +++---
 3 files changed, 8 insertions(+), 26 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
old mode 100644
new mode 100755
index 59e4c39..4dfe5bc
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -68,7 +66,7 @@ import org.apache.flink.runtime.taskexecutor.FileType;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -86,7 +84,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -1200,26 +1197,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	@VisibleForTesting
 	public static Collection<ResourceProfile> createSlotsPerWorker(Configuration config, long totalMemoryMB, int numSlots) {
 		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
+		final long processMemoryBytes = (totalMemoryMB - cutoffMB) << 20; // megabytes to bytes
+		final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(config, processMemoryBytes);
 
-		final long networkMB = NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(
-			(totalMemoryMB - cutoffMB) << 20, // megabytes to bytes
-			config) >> 20; // bytes to megabytes
+		final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
 
-		final long managedMB;
-		if (config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-			managedMB = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
-		} else {
-			managedMB = (long) ((totalMemoryMB - cutoffMB - networkMB) * config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION));
-		}
-
-		ResourceProfile resourceProfile = new ResourceProfile(
-			Double.MAX_VALUE,
-			Integer.MAX_VALUE,
-			Integer.MAX_VALUE,
-			Integer.MAX_VALUE,
-			Integer.MAX_VALUE,
-			(int) (managedMB / numSlots),
-			Collections.emptyMap());
 		return Collections.nCopies(numSlots, resourceProfile);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index d4e27e0..3db847dd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -525,7 +525,6 @@ public class TaskManagerServices {
 		}
 	}
 
-	@VisibleForTesting
 	public static ResourceProfile computeSlotResourceProfile(int numOfSlots, long managedMemorySize) {
 		int managedMemoryPerSlotMB = (int) bytesToMegabytes(managedMemorySize / numOfSlots);
 		return new ResourceProfile(
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 5d10a10..2b504c0 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -565,13 +565,14 @@ public class YarnResourceManagerTest extends TestLogger {
 			runTest(() -> {
 
 				ResourceProfile rmCalculatedResourceProfile =
-					ResourceManager.createSlotsPerWorker(config, resource.getMemory(), config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
-						.iterator().next();
+					ResourceManager.updateTaskManagerConfigAndCreateWorkerSlotProfiles(
+						config, resource.getMemory(), config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
+								.iterator().next();
 
 				ResourceProfile tmCalculatedResourceProfile =
 					TaskManagerServices.computeSlotResourceProfile(
 						config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
-						MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes());
+						MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
 
 				assertTrue(rmCalculatedResourceProfile.equals(tmCalculatedResourceProfile));
 			});