You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/08 13:34:10 UTC
[flink] 07/10: [FLINK-12812] [runtime] (follow-up) Consolidate
profile/memory configuration logic in ResourceManagers.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 21d7503098ea1b01191f4d6c3fffb1aac4b0d45a
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 19:12:33 2019 +0200
[FLINK-12812] [runtime] (follow-up) Consolidate profile/memory configuration logic in ResourceManagers.
---
.../mesos/runtime/clusterframework/MesosResourceManager.java | 11 ++++-------
.../flink/runtime/resourcemanager/ResourceManager.java | 12 ++++++++++--
.../main/java/org/apache/flink/yarn/YarnResourceManager.java | 8 ++------
3 files changed, 16 insertions(+), 15 deletions(-)
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
old mode 100644
new mode 100755
index 3be156e..e4ef99a
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -20,7 +20,6 @@ package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
@@ -185,7 +184,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
this.mesosServices = Preconditions.checkNotNull(mesosServices);
this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
- this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+ // copy the config, because we might change it for the TaskManagers
+ this.flinkConfig = new Configuration(Preconditions.checkNotNull(flinkConfig));
this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
@@ -199,11 +199,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
this.workersBeingReturned = new HashMap<>(8);
final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
- this.slotsPerWorker = createSlotsPerWorker(flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
-
- // set the exact managed memory size into configuration, to make sure TMs will derive the same size
- final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
- this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
+ this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(
+ flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
}
protected ActorRef createSelfActor() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 4dfe5bc..b845d1d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -1195,14 +1196,21 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
// ------------------------------------------------------------------------
@VisibleForTesting
- public static Collection<ResourceProfile> createSlotsPerWorker(Configuration config, long totalMemoryMB, int numSlots) {
+ public static Collection<ResourceProfile> updateTaskManagerConfigAndCreateWorkerSlotProfiles(
+ Configuration config, long totalMemoryMB, int numSlots) {
+
final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
final long processMemoryBytes = (totalMemoryMB - cutoffMB) << 20; // megabytes to bytes
final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(config, processMemoryBytes);
- final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
+ updateFlinkConfForManagedMemory(config, managedMemoryBytes);
+ final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
return Collections.nCopies(numSlots, resourceProfile);
}
+
+ static void updateFlinkConfForManagedMemory(Configuration conf, long managedMemorySize) {
+ conf.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemorySize + "b");
+ }
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
old mode 100644
new mode 100755
index a1321ea..6ebf669
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -158,7 +158,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
clusterInformation,
fatalErrorHandler,
jobManagerMetricGroup);
- this.flinkConfig = flinkConfig;
+ this.flinkConfig = new Configuration(flinkConfig); // copy, because we alter the config
this.yarnConfig = new YarnConfiguration();
this.env = env;
this.workerNodeMap = new ConcurrentHashMap<>();
@@ -184,11 +184,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
- this.slotsPerWorker = createSlotsPerWorker(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
-
- // set the exact managed memory size into configuration, to make sure TMs will derive the same size
- final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
- this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
+ this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
}
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(