You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/08 13:34:05 UTC
[flink] 02/10: [FLINK-12812][runtime] Set resource profiles for
task slots
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c8b93b354d6d72ef3cacc9b0b9adcdb1df26e625
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Jul 4 17:10:30 2019 +0800
[FLINK-12812][runtime] Set resource profiles for task slots
- Add managed memory in ResourceSpec and ResourceProfile.
- Set resource profiles for task slots when starting TaskManagers.
- Set resource profiles for pending task manager slots.
---
.../flink/api/common/operators/ResourceSpec.java | 33 ++++++++++--
.../clusterframework/MesosResourceManager.java | 7 ++-
.../clusterframework/types/ResourceProfile.java | 32 +++++++++---
.../runtime/resourcemanager/ResourceManager.java | 32 +++++++++++-
.../runtime/taskexecutor/TaskManagerServices.java | 20 +++++++-
.../types/ResourceProfileTest.java | 28 +++++-----
.../org/apache/flink/yarn/YarnResourceManager.java | 6 ++-
.../apache/flink/yarn/YarnResourceManagerTest.java | 59 +++++++++++++++++++++-
8 files changed, 185 insertions(+), 32 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 7bc2948..5b1b7b1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -43,6 +43,7 @@ import java.util.Objects;
* <li>Direct Memory Size</li>
* <li>Native Memory Size</li>
* <li>State Size</li>
+ * <li>Managed Memory Size</li>
* <li>Extended resources</li>
* </ol>
*/
@@ -51,7 +52,7 @@ public class ResourceSpec implements Serializable {
private static final long serialVersionUID = 1L;
- public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0);
+ public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0, 0);
public static final String GPU_NAME = "GPU";
@@ -70,6 +71,9 @@ public class ResourceSpec implements Serializable {
/** How many state size in mb are used. */
private final int stateSizeInMB;
+ /** The required amount of managed memory (in MB). */
+ private final int managedMemoryInMB;
+
private final Map<String, Resource> extendedResources = new HashMap<>(1);
/**
@@ -80,6 +84,7 @@ public class ResourceSpec implements Serializable {
* @param directMemoryInMB The size of the java nio direct memory, in megabytes.
* @param nativeMemoryInMB The size of the native memory, in megabytes.
* @param stateSizeInMB The state size for storing in checkpoint.
+ * @param managedMemoryInMB The size of managed memory, in megabytes.
* @param extendedResources The extended resources, associated with the resource manager used
*/
protected ResourceSpec(
@@ -88,12 +93,14 @@ public class ResourceSpec implements Serializable {
int directMemoryInMB,
int nativeMemoryInMB,
int stateSizeInMB,
+ int managedMemoryInMB,
Resource... extendedResources) {
this.cpuCores = cpuCores;
this.heapMemoryInMB = heapMemoryInMB;
this.directMemoryInMB = directMemoryInMB;
this.nativeMemoryInMB = nativeMemoryInMB;
this.stateSizeInMB = stateSizeInMB;
+ this.managedMemoryInMB = managedMemoryInMB;
for (Resource resource : extendedResources) {
if (resource != null) {
this.extendedResources.put(resource.getName(), resource);
@@ -114,7 +121,8 @@ public class ResourceSpec implements Serializable {
this.heapMemoryInMB + other.heapMemoryInMB,
this.directMemoryInMB + other.directMemoryInMB,
this.nativeMemoryInMB + other.nativeMemoryInMB,
- this.stateSizeInMB + other.stateSizeInMB);
+ this.stateSizeInMB + other.stateSizeInMB,
+ this.managedMemoryInMB + other.managedMemoryInMB);
target.extendedResources.putAll(extendedResources);
for (Resource resource : other.extendedResources.values()) {
target.extendedResources.merge(resource.getName(), resource, (v1, v2) -> v1.merge(v2));
@@ -142,6 +150,10 @@ public class ResourceSpec implements Serializable {
return this.stateSizeInMB;
}
+ public int getManagedMemory() {
+ return this.managedMemoryInMB;
+ }
+
public double getGPUResource() {
Resource gpuResource = extendedResources.get(GPU_NAME);
if (gpuResource != null) {
@@ -162,7 +174,7 @@ public class ResourceSpec implements Serializable {
*/
public boolean isValid() {
if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
- this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
+ this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0 && managedMemoryInMB >= 0) {
for (Resource resource : extendedResources.values()) {
if (resource.getValue() < 0) {
return false;
@@ -187,7 +199,8 @@ public class ResourceSpec implements Serializable {
int cmp3 = Integer.compare(this.directMemoryInMB, other.directMemoryInMB);
int cmp4 = Integer.compare(this.nativeMemoryInMB, other.nativeMemoryInMB);
int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
- if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
+ int cmp6 = Integer.compare(this.managedMemoryInMB, other.managedMemoryInMB);
+ if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0 && cmp6 <= 0) {
for (Resource resource : extendedResources.values()) {
if (!other.extendedResources.containsKey(resource.getName()) ||
other.extendedResources.get(resource.getName()).getResourceAggregateType() != resource.getResourceAggregateType() ||
@@ -211,6 +224,7 @@ public class ResourceSpec implements Serializable {
this.directMemoryInMB == that.directMemoryInMB &&
this.nativeMemoryInMB == that.nativeMemoryInMB &&
this.stateSizeInMB == that.stateSizeInMB &&
+ this.managedMemoryInMB == that.managedMemoryInMB &&
Objects.equals(this.extendedResources, that.extendedResources);
} else {
return false;
@@ -225,6 +239,7 @@ public class ResourceSpec implements Serializable {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+ result = 31 * result + managedMemoryInMB;
result = 31 * result + extendedResources.hashCode();
return result;
}
@@ -240,7 +255,8 @@ public class ResourceSpec implements Serializable {
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
- ", stateSizeInMB=" + stateSizeInMB + extend +
+ ", stateSizeInMB=" + stateSizeInMB +
+ ", managedMemoryInMB=" + managedMemoryInMB + extend +
'}';
}
@@ -258,6 +274,7 @@ public class ResourceSpec implements Serializable {
private int directMemoryInMB;
private int nativeMemoryInMB;
private int stateSizeInMB;
+ private int managedMemoryInMB;
private GPUResource gpuResource;
public Builder setCpuCores(double cpuCores) {
@@ -285,6 +302,11 @@ public class ResourceSpec implements Serializable {
return this;
}
+ public Builder setManagedMemoryInMB(int managedMemory) {
+ this.managedMemoryInMB = managedMemory;
+ return this;
+ }
+
public Builder setGPUResource(double gpus) {
this.gpuResource = new GPUResource(gpus);
return this;
@@ -297,6 +319,7 @@ public class ResourceSpec implements Serializable {
directMemoryInMB,
nativeMemoryInMB,
stateSizeInMB,
+ managedMemoryInMB,
gpuResource);
}
}
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 04cf7cc..3be156e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
@@ -198,7 +199,11 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
this.workersBeingReturned = new HashMap<>(8);
final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
- this.slotsPerWorker = createSlotsPerWorker(containeredTaskManagerParameters.numSlots());
+ this.slotsPerWorker = createSlotsPerWorker(flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
+
+ // set the exact managed memory size into configuration, to make sure TMs will derive the same size
+ final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
+ this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
}
protected ActorRef createSelfActor() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 8137d9f..21eb9b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -51,7 +51,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1);
/** ResourceProfile which matches any other ResourceProfile. */
- public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
+ public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
// ------------------------------------------------------------------------
@@ -70,6 +70,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
/** Memory used for the task in the slot to communicate with its upstreams. Set by job master. */
private final int networkMemoryInMB;
+ /** The required amount of managed memory (in MB). */
+ private final int managedMemoryInMB;
+
/** A extensible field for user specified resources from {@link ResourceSpec}. */
private final Map<String, Resource> extendedResources = new HashMap<>(1);
@@ -83,6 +86,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
* @param directMemoryInMB The size of the direct memory, in megabytes.
* @param nativeMemoryInMB The size of the native memory, in megabytes.
* @param networkMemoryInMB The size of the memory for input and output, in megabytes.
+ * @param managedMemoryInMB The size of managed memory, in megabytes.
* @param extendedResources The extended resources such as GPU and FPGA
*/
public ResourceProfile(
@@ -91,12 +95,14 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
int directMemoryInMB,
int nativeMemoryInMB,
int networkMemoryInMB,
+ int managedMemoryInMB,
Map<String, Resource> extendedResources) {
this.cpuCores = cpuCores;
this.heapMemoryInMB = heapMemoryInMB;
this.directMemoryInMB = directMemoryInMB;
this.nativeMemoryInMB = nativeMemoryInMB;
this.networkMemoryInMB = networkMemoryInMB;
+ this.managedMemoryInMB = managedMemoryInMB;
if (extendedResources != null) {
this.extendedResources.putAll(extendedResources);
}
@@ -109,7 +115,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
* @param heapMemoryInMB The size of the heap memory, in megabytes.
*/
public ResourceProfile(double cpuCores, int heapMemoryInMB) {
- this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.emptyMap());
+ this(cpuCores, heapMemoryInMB, 0, 0, 0, 0, Collections.emptyMap());
}
/**
@@ -123,6 +129,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
other.directMemoryInMB,
other.nativeMemoryInMB,
other.networkMemoryInMB,
+ other.managedMemoryInMB,
other.extendedResources);
}
@@ -173,12 +180,20 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
}
/**
+ * Get the managed memory needed in MB.
+ * @return The managed memory in MB.
+ */
+ public int getManagedMemoryInMB() {
+ return managedMemoryInMB;
+ }
+
+ /**
* Get the total memory needed in MB.
*
* @return The total memory in MB
*/
public int getMemoryInMB() {
- return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + networkMemoryInMB;
+ return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + networkMemoryInMB + managedMemoryInMB;
}
/**
@@ -187,7 +202,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
* @return The operator memory in MB
*/
public int getOperatorsMemoryInMB() {
- return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
+ return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + managedMemoryInMB;
}
/**
@@ -215,7 +230,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
heapMemoryInMB >= required.getHeapMemoryInMB() &&
directMemoryInMB >= required.getDirectMemoryInMB() &&
nativeMemoryInMB >= required.getNativeMemoryInMB() &&
- networkMemoryInMB >= required.getNetworkMemoryInMB()) {
+ networkMemoryInMB >= required.getNetworkMemoryInMB() &&
+ managedMemoryInMB >= required.getManagedMemoryInMB()) {
for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) {
if (!extendedResources.containsKey(resource.getKey()) ||
!extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType()) ||
@@ -270,6 +286,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + networkMemoryInMB;
+ result = 31 * result + managedMemoryInMB;
result = 31 * result + extendedResources.hashCode();
return result;
}
@@ -286,6 +303,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
this.directMemoryInMB == that.directMemoryInMB &&
this.nativeMemoryInMB == that.nativeMemoryInMB &&
this.networkMemoryInMB == that.networkMemoryInMB &&
+ this.managedMemoryInMB == that.managedMemoryInMB &&
Objects.equals(extendedResources, that.extendedResources);
}
return false;
@@ -302,7 +320,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
- ", networkMemoryInMB=" + networkMemoryInMB + resources +
+ ", networkMemoryInMB=" + networkMemoryInMB +
+ ", managedMemoryInMB=" + managedMemoryInMB + resources +
'}';
}
@@ -315,6 +334,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
resourceSpec.getDirectMemory(),
resourceSpec.getNativeMemory(),
networkMemory,
+ resourceSpec.getManagedMemory(),
copiedExtendedResources);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 0ed9247..59e4c39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -22,8 +22,12 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -64,6 +68,7 @@ import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -81,6 +86,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -1191,8 +1197,30 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
// Helper methods
// ------------------------------------------------------------------------
- protected static Collection<ResourceProfile> createSlotsPerWorker(int numSlots) {
- return Collections.nCopies(numSlots, ResourceProfile.ANY);
+ @VisibleForTesting
+ public static Collection<ResourceProfile> createSlotsPerWorker(Configuration config, long totalMemoryMB, int numSlots) {
+ final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
+
+ final long networkMB = NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(
+ (totalMemoryMB - cutoffMB) << 20, // megabytes to bytes
+ config) >> 20; // bytes to megabytes
+
+ final long managedMB;
+ if (config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
+ managedMB = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
+ } else {
+ managedMB = (long) ((totalMemoryMB - cutoffMB - networkMB) * config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION));
+ }
+
+ ResourceProfile resourceProfile = new ResourceProfile(
+ Double.MAX_VALUE,
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ (int) (managedMB / numSlots),
+ Collections.emptyMap());
+ return Collections.nCopies(numSlots, resourceProfile);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 8de72f1..dfcd5e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -257,13 +258,15 @@ public class TaskManagerServices {
// this call has to happen strictly after the network stack has been initialized
final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration);
+ final long managedMemorySize = memoryManager.getMemorySize();
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
- final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
+ final int numOfSlots = taskManagerServicesConfiguration.getNumberOfSlots();
+ final List<ResourceProfile> resourceProfiles = new ArrayList<>(numOfSlots);
for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
- resourceProfiles.add(ResourceProfile.ANY);
+ resourceProfiles.add(computeSlotResourceProfile(numOfSlots, managedMemorySize));
}
final TimerService<AllocationID> timerService = new TimerService<>(
@@ -507,4 +510,17 @@ public class TaskManagerServices {
}
}
}
+
+ @VisibleForTesting
+ public static ResourceProfile computeSlotResourceProfile(int numOfSlots, long managedMemorySize) {
+ int managedMemoryPerSlot = (int) (managedMemorySize / numOfSlots);
+ return new ResourceProfile(
+ Double.MAX_VALUE,
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ managedMemoryPerSlot,
+ Collections.emptyMap());
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 3a42c33..a6c5f58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -31,10 +31,10 @@ public class ResourceProfileTest {
@Test
public void testMatchRequirement() throws Exception {
- ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, Collections.emptyMap());
- ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, Collections.emptyMap());
- ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, Collections.emptyMap());
- ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, Collections.emptyMap());
+ ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, 0, Collections.emptyMap());
+ ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, 0, Collections.emptyMap());
+ ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, 0, Collections.emptyMap());
+ ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, 0, Collections.emptyMap());
assertFalse(rp1.isMatching(rp2));
assertTrue(rp2.isMatching(rp1));
@@ -50,7 +50,7 @@ public class ResourceProfileTest {
assertTrue(rp4.isMatching(rp3));
assertTrue(rp4.isMatching(rp4));
- ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 100, null);
+ ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 100, 100, null);
assertFalse(rp4.isMatching(rp5));
ResourceSpec rs1 = ResourceSpec.newBuilder().
@@ -99,20 +99,22 @@ public class ResourceProfileTest {
build();
assertTrue(ResourceProfile.fromResourceSpec(rs3, 100).equals(ResourceProfile.fromResourceSpec(rs5, 100)));
- ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, Collections.emptyMap());
- ResourceProfile rp2 = new ResourceProfile(1.1, 100, 100, 100, 100, Collections.emptyMap());
- ResourceProfile rp3 = new ResourceProfile(1.0, 110, 100, 100, 100, Collections.emptyMap());
- ResourceProfile rp4 = new ResourceProfile(1.0, 100, 110, 100, 100, Collections.emptyMap());
- ResourceProfile rp5 = new ResourceProfile(1.0, 100, 100, 110, 100, Collections.emptyMap());
- ResourceProfile rp6 = new ResourceProfile(1.0, 100, 100, 100, 110, Collections.emptyMap());
- ResourceProfile rp7 = new ResourceProfile(1.0, 100, 100, 100, 100, Collections.emptyMap());
+ ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
+ ResourceProfile rp2 = new ResourceProfile(1.1, 100, 100, 100, 100, 100, Collections.emptyMap());
+ ResourceProfile rp3 = new ResourceProfile(1.0, 110, 100, 100, 100, 100, Collections.emptyMap());
+ ResourceProfile rp4 = new ResourceProfile(1.0, 100, 110, 100, 100, 100, Collections.emptyMap());
+ ResourceProfile rp5 = new ResourceProfile(1.0, 100, 100, 110, 100, 100, Collections.emptyMap());
+ ResourceProfile rp6 = new ResourceProfile(1.0, 100, 100, 100, 110, 100, Collections.emptyMap());
+ ResourceProfile rp7 = new ResourceProfile(1.0, 100, 100, 100, 100, 110, Collections.emptyMap());
+ ResourceProfile rp8 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
assertFalse(rp1.equals(rp2));
assertFalse(rp1.equals(rp3));
assertFalse(rp1.equals(rp4));
assertFalse(rp1.equals(rp5));
assertFalse(rp1.equals(rp6));
- assertTrue(rp1.equals(rp7));
+ assertFalse(rp1.equals(rp7));
+ assertTrue(rp1.equals(rp8));
}
@Test
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 65baab5..a1321ea 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -184,7 +184,11 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
- this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots);
+ this.slotsPerWorker = createSlotsPerWorker(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
+
+ // set the exact managed memory size into configuration, to make sure TMs will derive the same size
+ final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
+ this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
}
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index eb75b66..5d10a10 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -21,7 +21,10 @@ package org.apache.flink.yarn;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -42,6 +45,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -53,6 +57,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
@@ -261,6 +266,10 @@ public class YarnResourceManagerTest extends TestLogger {
* Create mock RM dependencies.
*/
Context() throws Exception {
+ this(flinkConfig);
+ }
+
+ Context(Configuration configuration) throws Exception {
rpcService = new TestingRpcService();
rmServices = new MockResourceManagerRuntimeServices();
@@ -271,7 +280,7 @@ public class YarnResourceManagerTest extends TestLogger {
rpcService,
RM_ADDRESS,
rmResourceID,
- flinkConfig,
+ configuration,
env,
rmServices.highAvailabilityServices,
rmServices.heartbeatServices,
@@ -419,7 +428,7 @@ public class YarnResourceManagerTest extends TestLogger {
final SlotReport slotReport = new SlotReport(
new SlotStatus(
new SlotID(taskManagerResourceId, 1),
- new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
+ new ResourceProfile(10, 1, 1, 1, 0, 0, Collections.emptyMap())));
CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
.registerTaskExecutor(
@@ -522,4 +531,50 @@ public class YarnResourceManagerTest extends TestLogger {
});
}};
}
+
+ /**
+ * Tests that RM and TM calculate same slot resource profile.
+ * @throws Exception
+ */
+ @Test
+ public void testCreateSlotsPerWorker() throws Exception {
+ testCreateSlotsPerWorker(flinkConfig, Resource.newInstance(500, 100));
+
+ Configuration config1 = new Configuration();
+ config1.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5);
+ testCreateSlotsPerWorker(config1, Resource.newInstance(1000, 10));
+
+ Configuration config2 = new Configuration();
+ config2.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "789m");
+ testCreateSlotsPerWorker(config2, Resource.newInstance(800, 50));
+
+ Configuration config3 = new Configuration();
+ config3.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "300m");
+ config3.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
+ testCreateSlotsPerWorker(config3, Resource.newInstance(2000, 60));
+
+ Configuration config4 = new Configuration();
+ config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "10m");
+ config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "10m");
+ config4.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
+ testCreateSlotsPerWorker(config4, Resource.newInstance(1000, 1));
+ }
+
+ private void testCreateSlotsPerWorker(Configuration config, Resource resource) throws Exception {
+ new Context(config) {{
+ runTest(() -> {
+
+ ResourceProfile rmCalculatedResourceProfile =
+ ResourceManager.createSlotsPerWorker(config, resource.getMemory(), config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
+ .iterator().next();
+
+ ResourceProfile tmCalculatedResourceProfile =
+ TaskManagerServices.computeSlotResourceProfile(
+ config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
+ MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes());
+
+ assertTrue(rmCalculatedResourceProfile.equals(tmCalculatedResourceProfile));
+ });
+ }};
+ }
}