You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/08 13:34:09 UTC
[flink] 06/10: [FLINK-12812] [runtime] (follow-up) Deduplicate
managed memory configuration logic between ResourceManager and
TaskManagerServices
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 494e12d57d4cc780749bfabe9a6bc7fec9c6887d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 18:41:18 2019 +0200
[FLINK-12812] [runtime] (follow-up) Deduplicate managed memory configuration logic between ResourceManager and TaskManagerServices
---
.../runtime/resourcemanager/ResourceManager.java | 26 ++++------------------
.../runtime/taskexecutor/TaskManagerServices.java | 1 -
.../apache/flink/yarn/YarnResourceManagerTest.java | 7 +++---
3 files changed, 8 insertions(+), 26 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
old mode 100644
new mode 100755
index 59e4c39..4dfe5bc
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -68,7 +66,7 @@ import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -86,7 +84,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -1200,26 +1197,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
@VisibleForTesting
public static Collection<ResourceProfile> createSlotsPerWorker(Configuration config, long totalMemoryMB, int numSlots) {
final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
+ final long processMemoryBytes = (totalMemoryMB - cutoffMB) << 20; // megabytes to bytes
+ final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(config, processMemoryBytes);
- final long networkMB = NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(
- (totalMemoryMB - cutoffMB) << 20, // megabytes to bytes
- config) >> 20; // bytes to megabytes
+ final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
- final long managedMB;
- if (config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
- managedMB = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
- } else {
- managedMB = (long) ((totalMemoryMB - cutoffMB - networkMB) * config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION));
- }
-
- ResourceProfile resourceProfile = new ResourceProfile(
- Double.MAX_VALUE,
- Integer.MAX_VALUE,
- Integer.MAX_VALUE,
- Integer.MAX_VALUE,
- Integer.MAX_VALUE,
- (int) (managedMB / numSlots),
- Collections.emptyMap());
return Collections.nCopies(numSlots, resourceProfile);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index d4e27e0..3db847dd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -525,7 +525,6 @@ public class TaskManagerServices {
}
}
- @VisibleForTesting
public static ResourceProfile computeSlotResourceProfile(int numOfSlots, long managedMemorySize) {
int managedMemoryPerSlotMB = (int) bytesToMegabytes(managedMemorySize / numOfSlots);
return new ResourceProfile(
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 5d10a10..2b504c0 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -565,13 +565,14 @@ public class YarnResourceManagerTest extends TestLogger {
runTest(() -> {
ResourceProfile rmCalculatedResourceProfile =
- ResourceManager.createSlotsPerWorker(config, resource.getMemory(), config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
- .iterator().next();
+ ResourceManager.updateTaskManagerConfigAndCreateWorkerSlotProfiles(
+ config, resource.getMemory(), config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
+ .iterator().next();
ResourceProfile tmCalculatedResourceProfile =
TaskManagerServices.computeSlotResourceProfile(
config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
- MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes());
+ MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
assertTrue(rmCalculatedResourceProfile.equals(tmCalculatedResourceProfile));
});