You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/08 13:34:10 UTC

[flink] 07/10: [FLINK-12812] [runtime] (follow-up) Consolidate profile/memory configuration logic in ResourceManagers.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21d7503098ea1b01191f4d6c3fffb1aac4b0d45a
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 19:12:33 2019 +0200

    [FLINK-12812] [runtime] (follow-up) Consolidate profile/memory configuration logic in ResourceManagers.
---
 .../mesos/runtime/clusterframework/MesosResourceManager.java | 11 ++++-------
 .../flink/runtime/resourcemanager/ResourceManager.java       | 12 ++++++++++--
 .../main/java/org/apache/flink/yarn/YarnResourceManager.java |  8 ++------
 3 files changed, 16 insertions(+), 15 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
old mode 100644
new mode 100755
index 3be156e..e4ef99a
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -20,7 +20,6 @@ package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
@@ -185,7 +184,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		this.mesosServices = Preconditions.checkNotNull(mesosServices);
 		this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
 
-		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+		// copy the config, because we might change it for the TaskManagers
+		this.flinkConfig = new Configuration(Preconditions.checkNotNull(flinkConfig));
 		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
 
 		this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
@@ -199,11 +199,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		this.workersBeingReturned = new HashMap<>(8);
 
 		final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
-		this.slotsPerWorker = createSlotsPerWorker(flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
-
-		// set the exact managed memory size into configuration, to make sure TMs will derive the same size
-		final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
-		this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
+		this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(
+			flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
 	}
 
 	protected ActorRef createSelfActor() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 4dfe5bc..b845d1d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -1195,14 +1196,21 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	// ------------------------------------------------------------------------
 
 	@VisibleForTesting
-	public static Collection<ResourceProfile> createSlotsPerWorker(Configuration config, long totalMemoryMB, int numSlots) {
+	public static Collection<ResourceProfile> updateTaskManagerConfigAndCreateWorkerSlotProfiles(
+			Configuration config, long totalMemoryMB, int numSlots) {
+
 		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
 		final long processMemoryBytes = (totalMemoryMB - cutoffMB) << 20; // megabytes to bytes
 		final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(config, processMemoryBytes);
 
-		final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
+		updateFlinkConfForManagedMemory(config, managedMemoryBytes);
 
+		final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
 		return Collections.nCopies(numSlots, resourceProfile);
 	}
+
+	static void updateFlinkConfForManagedMemory(Configuration conf, long managedMemorySize) {
+		conf.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemorySize + "b");
+	}
 }
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
old mode 100644
new mode 100755
index a1321ea..6ebf669
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -158,7 +158,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			clusterInformation,
 			fatalErrorHandler,
 			jobManagerMetricGroup);
-		this.flinkConfig  = flinkConfig;
+		this.flinkConfig  = new Configuration(flinkConfig); // copy, because we alter the config
 		this.yarnConfig = new YarnConfiguration();
 		this.env = env;
 		this.workerNodeMap = new ConcurrentHashMap<>();
@@ -184,11 +184,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
 		this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 
-		this.slotsPerWorker = createSlotsPerWorker(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
-
-		// set the exact managed memory size into configuration, to make sure TMs will derive the same size
-		final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
-		this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
+		this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
 	}
 
 	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(