You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/08 13:34:05 UTC

[flink] 02/10: [FLINK-12812][runtime] Set resource profiles for task slots

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c8b93b354d6d72ef3cacc9b0b9adcdb1df26e625
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Jul 4 17:10:30 2019 +0800

    [FLINK-12812][runtime] Set resource profiles for task slots
    
      - Add managed memory in ResourceSpec and ResourceProfile.
      - Set resource profiles for task slots when starting TaskManagers.
      - Set resource profiles for pending task manager slots.
---
 .../flink/api/common/operators/ResourceSpec.java   | 33 ++++++++++--
 .../clusterframework/MesosResourceManager.java     |  7 ++-
 .../clusterframework/types/ResourceProfile.java    | 32 +++++++++---
 .../runtime/resourcemanager/ResourceManager.java   | 32 +++++++++++-
 .../runtime/taskexecutor/TaskManagerServices.java  | 20 +++++++-
 .../types/ResourceProfileTest.java                 | 28 +++++-----
 .../org/apache/flink/yarn/YarnResourceManager.java |  6 ++-
 .../apache/flink/yarn/YarnResourceManagerTest.java | 59 +++++++++++++++++++++-
 8 files changed, 185 insertions(+), 32 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 7bc2948..5b1b7b1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -43,6 +43,7 @@ import java.util.Objects;
  *     <li>Direct Memory Size</li>
  *     <li>Native Memory Size</li>
  *     <li>State Size</li>
+ *     <li>Managed Memory Size</li>
  *     <li>Extended resources</li>
  * </ol>
  */
@@ -51,7 +52,7 @@ public class ResourceSpec implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0);
+	public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0, 0);
 
 	public static final String GPU_NAME = "GPU";
 
@@ -70,6 +71,9 @@ public class ResourceSpec implements Serializable {
 	/** How many state size in mb are used. */
 	private final int stateSizeInMB;
 
+	/** The required amount of managed memory (in MB). */
+	private final int managedMemoryInMB;
+
 	private final Map<String, Resource> extendedResources = new HashMap<>(1);
 
 	/**
@@ -80,6 +84,7 @@ public class ResourceSpec implements Serializable {
 	 * @param directMemoryInMB The size of the java nio direct memory, in megabytes.
 	 * @param nativeMemoryInMB The size of the native memory, in megabytes.
 	 * @param stateSizeInMB The state size for storing in checkpoint.
+	 * @param managedMemoryInMB The size of managed memory, in megabytes.
 	 * @param extendedResources The extended resources, associated with the resource manager used
 	 */
 	protected ResourceSpec(
@@ -88,12 +93,14 @@ public class ResourceSpec implements Serializable {
 			int directMemoryInMB,
 			int nativeMemoryInMB,
 			int stateSizeInMB,
+			int managedMemoryInMB,
 			Resource... extendedResources) {
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
 		this.nativeMemoryInMB = nativeMemoryInMB;
 		this.stateSizeInMB = stateSizeInMB;
+		this.managedMemoryInMB = managedMemoryInMB;
 		for (Resource resource : extendedResources) {
 			if (resource != null) {
 				this.extendedResources.put(resource.getName(), resource);
@@ -114,7 +121,8 @@ public class ResourceSpec implements Serializable {
 				this.heapMemoryInMB + other.heapMemoryInMB,
 				this.directMemoryInMB + other.directMemoryInMB,
 				this.nativeMemoryInMB + other.nativeMemoryInMB,
-				this.stateSizeInMB + other.stateSizeInMB);
+				this.stateSizeInMB + other.stateSizeInMB,
+				this.managedMemoryInMB + other.managedMemoryInMB);
 		target.extendedResources.putAll(extendedResources);
 		for (Resource resource : other.extendedResources.values()) {
 			target.extendedResources.merge(resource.getName(), resource, (v1, v2) -> v1.merge(v2));
@@ -142,6 +150,10 @@ public class ResourceSpec implements Serializable {
 		return this.stateSizeInMB;
 	}
 
+	public int getManagedMemory() {
+		return this.managedMemoryInMB;
+	}
+
 	public double getGPUResource() {
 		Resource gpuResource = extendedResources.get(GPU_NAME);
 		if (gpuResource != null) {
@@ -162,7 +174,7 @@ public class ResourceSpec implements Serializable {
 	 */
 	public boolean isValid() {
 		if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
-				this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
+				this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0 && managedMemoryInMB >= 0) {
 			for (Resource resource : extendedResources.values()) {
 				if (resource.getValue() < 0) {
 					return false;
@@ -187,7 +199,8 @@ public class ResourceSpec implements Serializable {
 		int cmp3 = Integer.compare(this.directMemoryInMB, other.directMemoryInMB);
 		int cmp4 = Integer.compare(this.nativeMemoryInMB, other.nativeMemoryInMB);
 		int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
-		if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
+		int cmp6 = Integer.compare(this.managedMemoryInMB, other.managedMemoryInMB);
+		if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0 && cmp6 <= 0) {
 			for (Resource resource : extendedResources.values()) {
 				if (!other.extendedResources.containsKey(resource.getName()) ||
 					other.extendedResources.get(resource.getName()).getResourceAggregateType() != resource.getResourceAggregateType() ||
@@ -211,6 +224,7 @@ public class ResourceSpec implements Serializable {
 					this.directMemoryInMB == that.directMemoryInMB &&
 					this.nativeMemoryInMB == that.nativeMemoryInMB &&
 					this.stateSizeInMB == that.stateSizeInMB &&
+					this.managedMemoryInMB == that.managedMemoryInMB &&
 					Objects.equals(this.extendedResources, that.extendedResources);
 		} else {
 			return false;
@@ -225,6 +239,7 @@ public class ResourceSpec implements Serializable {
 		result = 31 * result + directMemoryInMB;
 		result = 31 * result + nativeMemoryInMB;
 		result = 31 * result + stateSizeInMB;
+		result = 31 * result + managedMemoryInMB;
 		result = 31 * result + extendedResources.hashCode();
 		return result;
 	}
@@ -240,7 +255,8 @@ public class ResourceSpec implements Serializable {
 				", heapMemoryInMB=" + heapMemoryInMB +
 				", directMemoryInMB=" + directMemoryInMB +
 				", nativeMemoryInMB=" + nativeMemoryInMB +
-				", stateSizeInMB=" + stateSizeInMB + extend +
+				", stateSizeInMB=" + stateSizeInMB +
+				", managedMemoryInMB=" + managedMemoryInMB + extend +
 				'}';
 	}
 
@@ -258,6 +274,7 @@ public class ResourceSpec implements Serializable {
 		private int directMemoryInMB;
 		private int nativeMemoryInMB;
 		private int stateSizeInMB;
+		private int managedMemoryInMB;
 		private GPUResource gpuResource;
 
 		public Builder setCpuCores(double cpuCores) {
@@ -285,6 +302,11 @@ public class ResourceSpec implements Serializable {
 			return this;
 		}
 
+		public Builder setManagedMemoryInMB(int managedMemory) {
+			this.managedMemoryInMB = managedMemory;
+			return this;
+		}
+
 		public Builder setGPUResource(double gpus) {
 			this.gpuResource = new GPUResource(gpus);
 			return this;
@@ -297,6 +319,7 @@ public class ResourceSpec implements Serializable {
 				directMemoryInMB,
 				nativeMemoryInMB,
 				stateSizeInMB,
+				managedMemoryInMB,
 				gpuResource);
 		}
 	}
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 04cf7cc..3be156e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
@@ -198,7 +199,11 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		this.workersBeingReturned = new HashMap<>(8);
 
 		final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
-		this.slotsPerWorker = createSlotsPerWorker(containeredTaskManagerParameters.numSlots());
+		this.slotsPerWorker = createSlotsPerWorker(flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
+
+		// set the exact managed memory size into configuration, to make sure TMs will derive the same size
+		final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
+		this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
 	}
 
 	protected ActorRef createSelfActor() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 8137d9f..21eb9b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -51,7 +51,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1);
 
 	/** ResourceProfile which matches any other ResourceProfile. */
-	public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
+	public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
 
 	// ------------------------------------------------------------------------
 
@@ -70,6 +70,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	/** Memory used for the task in the slot to communicate with its upstreams. Set by job master. */
 	private final int networkMemoryInMB;
 
+	/** The required amount of managed memory (in MB). */
+	private final int managedMemoryInMB;
+
 	/** A extensible field for user specified resources from {@link ResourceSpec}. */
 	private final Map<String, Resource> extendedResources = new HashMap<>(1);
 
@@ -83,6 +86,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param directMemoryInMB The size of the direct memory, in megabytes.
 	 * @param nativeMemoryInMB The size of the native memory, in megabytes.
 	 * @param networkMemoryInMB The size of the memory for input and output, in megabytes.
+	 * @param managedMemoryInMB The size of managed memory, in megabytes.
 	 * @param extendedResources The extended resources such as GPU and FPGA
 	 */
 	public ResourceProfile(
@@ -91,12 +95,14 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			int directMemoryInMB,
 			int nativeMemoryInMB,
 			int networkMemoryInMB,
+			int managedMemoryInMB,
 			Map<String, Resource> extendedResources) {
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
 		this.nativeMemoryInMB = nativeMemoryInMB;
 		this.networkMemoryInMB = networkMemoryInMB;
+		this.managedMemoryInMB = managedMemoryInMB;
 		if (extendedResources != null) {
 			this.extendedResources.putAll(extendedResources);
 		}
@@ -109,7 +115,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param heapMemoryInMB The size of the heap memory, in megabytes.
 	 */
 	public ResourceProfile(double cpuCores, int heapMemoryInMB) {
-		this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.emptyMap());
+		this(cpuCores, heapMemoryInMB, 0, 0, 0, 0, Collections.emptyMap());
 	}
 
 	/**
@@ -123,6 +129,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 				other.directMemoryInMB,
 				other.nativeMemoryInMB,
 				other.networkMemoryInMB,
+				other.managedMemoryInMB,
 				other.extendedResources);
 	}
 
@@ -173,12 +180,20 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	/**
+	 * Get the managed memory needed in MB.
+	 * @return The managed memory in MB.
+	 */
+	public int getManagedMemoryInMB() {
+		return managedMemoryInMB;
+	}
+
+	/**
 	 * Get the total memory needed in MB.
 	 *
 	 * @return The total memory in MB
 	 */
 	public int getMemoryInMB() {
-		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + networkMemoryInMB;
+		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + networkMemoryInMB + managedMemoryInMB;
 	}
 
 	/**
@@ -187,7 +202,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @return The operator memory in MB
 	 */
 	public int getOperatorsMemoryInMB() {
-		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
+		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + managedMemoryInMB;
 	}
 
 	/**
@@ -215,7 +230,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 				heapMemoryInMB >= required.getHeapMemoryInMB() &&
 				directMemoryInMB >= required.getDirectMemoryInMB() &&
 				nativeMemoryInMB >= required.getNativeMemoryInMB() &&
-				networkMemoryInMB >= required.getNetworkMemoryInMB()) {
+				networkMemoryInMB >= required.getNetworkMemoryInMB() &&
+				managedMemoryInMB >= required.getManagedMemoryInMB()) {
 			for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) {
 				if (!extendedResources.containsKey(resource.getKey()) ||
 						!extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType()) ||
@@ -270,6 +286,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 		result = 31 * result + directMemoryInMB;
 		result = 31 * result + nativeMemoryInMB;
 		result = 31 * result + networkMemoryInMB;
+		result = 31 * result + managedMemoryInMB;
 		result = 31 * result + extendedResources.hashCode();
 		return result;
 	}
@@ -286,6 +303,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 					this.directMemoryInMB == that.directMemoryInMB &&
 					this.nativeMemoryInMB == that.nativeMemoryInMB &&
 					this.networkMemoryInMB == that.networkMemoryInMB &&
+					this.managedMemoryInMB == that.managedMemoryInMB &&
 					Objects.equals(extendedResources, that.extendedResources);
 		}
 		return false;
@@ -302,7 +320,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			", heapMemoryInMB=" + heapMemoryInMB +
 			", directMemoryInMB=" + directMemoryInMB +
 			", nativeMemoryInMB=" + nativeMemoryInMB +
-			", networkMemoryInMB=" + networkMemoryInMB + resources +
+			", networkMemoryInMB=" + networkMemoryInMB +
+			", managedMemoryInMB=" + managedMemoryInMB + resources +
 			'}';
 	}
 
@@ -315,6 +334,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 				resourceSpec.getDirectMemory(),
 				resourceSpec.getNativeMemory(),
 				networkMemory,
+				resourceSpec.getManagedMemory(),
 				copiedExtendedResources);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 0ed9247..59e4c39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -22,8 +22,12 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -64,6 +68,7 @@ import org.apache.flink.runtime.taskexecutor.FileType;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -81,6 +86,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -1191,8 +1197,30 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	//  Helper methods
 	// ------------------------------------------------------------------------
 
-	protected static Collection<ResourceProfile> createSlotsPerWorker(int numSlots) {
-		return Collections.nCopies(numSlots, ResourceProfile.ANY);
+	@VisibleForTesting
+	public static Collection<ResourceProfile> createSlotsPerWorker(Configuration config, long totalMemoryMB, int numSlots) {
+		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
+
+		final long networkMB = NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(
+			(totalMemoryMB - cutoffMB) << 20, // megabytes to bytes
+			config) >> 20; // bytes to megabytes
+
+		final long managedMB;
+		if (config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
+			managedMB = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
+		} else {
+			managedMB = (long) ((totalMemoryMB - cutoffMB - networkMB) * config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION));
+		}
+
+		ResourceProfile resourceProfile = new ResourceProfile(
+			Double.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			(int) (managedMB / numSlots),
+			Collections.emptyMap());
+		return Collections.nCopies(numSlots, resourceProfile);
 	}
 }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 8de72f1..dfcd5e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -257,13 +258,15 @@ public class TaskManagerServices {
 
 		// this call has to happen strictly after the network stack has been initialized
 		final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration);
+		final long managedMemorySize = memoryManager.getMemorySize();
 
 		final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
 
-		final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
+		final int numOfSlots = taskManagerServicesConfiguration.getNumberOfSlots();
+		final List<ResourceProfile> resourceProfiles = new ArrayList<>(numOfSlots);
 
 		for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
-			resourceProfiles.add(ResourceProfile.ANY);
+			resourceProfiles.add(computeSlotResourceProfile(numOfSlots, managedMemorySize));
 		}
 
 		final TimerService<AllocationID> timerService = new TimerService<>(
@@ -507,4 +510,17 @@ public class TaskManagerServices {
 			}
 		}
 	}
+
+	@VisibleForTesting
+	public static ResourceProfile computeSlotResourceProfile(int numOfSlots, long managedMemorySize) {
+		int managedMemoryPerSlot = (int) (managedMemorySize / numOfSlots);
+		return new ResourceProfile(
+			Double.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			managedMemoryPerSlot,
+			Collections.emptyMap());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 3a42c33..a6c5f58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -31,10 +31,10 @@ public class ResourceProfileTest {
 
 	@Test
 	public void testMatchRequirement() throws Exception {
-		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, Collections.emptyMap());
-		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, Collections.emptyMap());
-		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, Collections.emptyMap());
-		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, Collections.emptyMap());
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, 0, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, 0, Collections.emptyMap());
+		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, 0, Collections.emptyMap());
+		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, 0, Collections.emptyMap());
 
 		assertFalse(rp1.isMatching(rp2));
 		assertTrue(rp2.isMatching(rp1));
@@ -50,7 +50,7 @@ public class ResourceProfileTest {
 		assertTrue(rp4.isMatching(rp3));
 		assertTrue(rp4.isMatching(rp4));
 
-		ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 100, null);
+		ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 100, 100, null);
 		assertFalse(rp4.isMatching(rp5));
 
 		ResourceSpec rs1 = ResourceSpec.newBuilder().
@@ -99,20 +99,22 @@ public class ResourceProfileTest {
 				build();
 		assertTrue(ResourceProfile.fromResourceSpec(rs3, 100).equals(ResourceProfile.fromResourceSpec(rs5, 100)));
 
-		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, Collections.emptyMap());
-		ResourceProfile rp2 = new ResourceProfile(1.1, 100, 100, 100, 100, Collections.emptyMap());
-		ResourceProfile rp3 = new ResourceProfile(1.0, 110, 100, 100, 100, Collections.emptyMap());
-		ResourceProfile rp4 = new ResourceProfile(1.0, 100, 110, 100, 100, Collections.emptyMap());
-		ResourceProfile rp5 = new ResourceProfile(1.0, 100, 100, 110, 100, Collections.emptyMap());
-		ResourceProfile rp6 = new ResourceProfile(1.0, 100, 100, 100, 110, Collections.emptyMap());
-		ResourceProfile rp7 = new ResourceProfile(1.0, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(1.1, 100, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp3 = new ResourceProfile(1.0, 110, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp4 = new ResourceProfile(1.0, 100, 110, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp5 = new ResourceProfile(1.0, 100, 100, 110, 100, 100, Collections.emptyMap());
+		ResourceProfile rp6 = new ResourceProfile(1.0, 100, 100, 100, 110, 100, Collections.emptyMap());
+		ResourceProfile rp7 = new ResourceProfile(1.0, 100, 100, 100, 100, 110, Collections.emptyMap());
+		ResourceProfile rp8 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
 
 		assertFalse(rp1.equals(rp2));
 		assertFalse(rp1.equals(rp3));
 		assertFalse(rp1.equals(rp4));
 		assertFalse(rp1.equals(rp5));
 		assertFalse(rp1.equals(rp6));
-		assertTrue(rp1.equals(rp7));
+		assertFalse(rp1.equals(rp7));
+		assertTrue(rp1.equals(rp8));
 	}
 
 	@Test
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 65baab5..a1321ea 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -184,7 +184,11 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
 		this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 
-		this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots);
+		this.slotsPerWorker = createSlotsPerWorker(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
+
+		// set the exact managed memory size into configuration, to make sure TMs will derive the same size
+		final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
+		this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
 	}
 
 	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index eb75b66..5d10a10 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -21,7 +21,10 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -42,6 +45,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -53,6 +57,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
@@ -261,6 +266,10 @@ public class YarnResourceManagerTest extends TestLogger {
 		 * Create mock RM dependencies.
 		 */
 		Context() throws Exception {
+			this(flinkConfig);
+		}
+
+		Context(Configuration configuration) throws  Exception {
 			rpcService = new TestingRpcService();
 			rmServices = new MockResourceManagerRuntimeServices();
 
@@ -271,7 +280,7 @@ public class YarnResourceManagerTest extends TestLogger {
 							rpcService,
 							RM_ADDRESS,
 							rmResourceID,
-							flinkConfig,
+							configuration,
 							env,
 							rmServices.highAvailabilityServices,
 							rmServices.heartbeatServices,
@@ -419,7 +428,7 @@ public class YarnResourceManagerTest extends TestLogger {
 				final SlotReport slotReport = new SlotReport(
 					new SlotStatus(
 						new SlotID(taskManagerResourceId, 1),
-						new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
+						new ResourceProfile(10, 1, 1, 1, 0, 0, Collections.emptyMap())));
 
 				CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
 					.registerTaskExecutor(
@@ -522,4 +531,50 @@ public class YarnResourceManagerTest extends TestLogger {
 			});
 		}};
 	}
+
+	/**
+	 * Tests that RM and TM calculate same slot resource profile.
+	 * @throws Exception
+	 */
+	@Test
+	public void testCreateSlotsPerWorker() throws Exception {
+		testCreateSlotsPerWorker(flinkConfig, Resource.newInstance(500, 100));
+
+		Configuration config1 = new Configuration();
+		config1.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5);
+		testCreateSlotsPerWorker(config1, Resource.newInstance(1000, 10));
+
+		Configuration config2 = new Configuration();
+		config2.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "789m");
+		testCreateSlotsPerWorker(config2,  Resource.newInstance(800, 50));
+
+		Configuration config3 = new Configuration();
+		config3.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "300m");
+		config3.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
+		testCreateSlotsPerWorker(config3,  Resource.newInstance(2000, 60));
+
+		Configuration config4 = new Configuration();
+		config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "10m");
+		config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "10m");
+		config4.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
+		testCreateSlotsPerWorker(config4,  Resource.newInstance(1000, 1));
+	}
+
+	private void testCreateSlotsPerWorker(Configuration config, Resource resource) throws  Exception {
+		new Context(config) {{
+			runTest(() -> {
+
+				ResourceProfile rmCalculatedResourceProfile =
+					ResourceManager.createSlotsPerWorker(config, resource.getMemory(), config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
+						.iterator().next();
+
+				ResourceProfile tmCalculatedResourceProfile =
+					TaskManagerServices.computeSlotResourceProfile(
+						config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
+						MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes());
+
+				assertTrue(rmCalculatedResourceProfile.equals(tmCalculatedResourceProfile));
+			});
+		}};
+	}
 }