You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2020/09/10 11:14:20 UTC
[flink] branch release-1.11 updated: [FLINK-19151][yarn] Update
container resource normalization algorithm,
with respect to Yarn FairScheduler.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new bfff6b1 [FLINK-19151][yarn] Update container resource normalization algorithm, with respect to Yarn FairScheduler.
bfff6b1 is described below
commit bfff6b15ec7dd3a4415f6a5a9d8535ea7960e474
Author: jinhai <ji...@gmail.com>
AuthorDate: Mon Sep 7 20:57:26 2020 +0800
[FLINK-19151][yarn] Update container resource normalization algorithm, with respect to Yarn FairScheduler.
This closes #13347.
---
.../src/main/java/org/apache/flink/yarn/Utils.java | 73 ++++++++++++++++++++++
.../yarn/WorkerSpecContainerResourceAdapter.java | 18 ++++--
.../org/apache/flink/yarn/YarnResourceManager.java | 17 +----
.../test/java/org/apache/flink/yarn/UtilsTest.java | 46 ++++++++++++++
.../WorkerSpecContainerResourceAdapterTest.java | 60 +++++++++++++-----
5 files changed, 178 insertions(+), 36 deletions(-)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 54dd063..a28f55b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,10 +18,13 @@
package org.apache.flink.yarn;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.StringUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -59,6 +63,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;
@@ -79,6 +84,21 @@ public final class Utils {
/** Yarn site xml file name populated in YARN container for secure IT run. */
public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
+ @VisibleForTesting
+ static final String YARN_RM_FAIR_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler";
+ @VisibleForTesting
+ static final String YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler";
+ @VisibleForTesting
+ static final String YARN_RM_INCREMENT_ALLOCATION_MB_KEY = "yarn.resource-types.memory-mb.increment-allocation";
+ @VisibleForTesting
+ static final String YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY = "yarn.scheduler.increment-allocation-mb";
+ private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_MB = 1024;
+ @VisibleForTesting
+ static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY = "yarn.resource-types.vcores.increment-allocation";
+ @VisibleForTesting
+ static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY = "yarn.scheduler.increment-allocation-vcores";
+ private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES = 1;
+
public static void setupYarnClassPath(Configuration conf, Map<String, String> appMasterEnv) {
addToEnvironment(
appMasterEnv,
@@ -520,4 +540,57 @@ public final class Utils {
throw new RuntimeException(String.format(message, values));
}
}
+
+ public static WorkerSpecContainerResourceAdapter createWorkerSpecContainerResourceAdapter(
+ org.apache.flink.configuration.Configuration flinkConfig,
+ YarnConfiguration yarnConfig) {
+
+ Resource unitResource = getUnitResource(yarnConfig);
+
+ return new WorkerSpecContainerResourceAdapter(
+ flinkConfig,
+ yarnConfig.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
+ yarnConfig.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES),
+ yarnConfig.getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
+ yarnConfig.getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
+ unitResource.getMemory(),
+ unitResource.getVirtualCores(),
+ ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
+ }
+
+ @VisibleForTesting
+ static Resource getUnitResource(YarnConfiguration yarnConfig) {
+ final int unitMemMB, unitVcore;
+
+ final String yarnRmSchedulerClazzName = yarnConfig.get(YarnConfiguration.RM_SCHEDULER);
+ if (Objects.equals(yarnRmSchedulerClazzName, YARN_RM_FAIR_SCHEDULER_CLAZZ) ||
+ Objects.equals(yarnRmSchedulerClazzName, YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ)) {
+ String propMem = yarnConfig.get(YARN_RM_INCREMENT_ALLOCATION_MB_KEY);
+ String propVcore = yarnConfig.get(YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY);
+
+ unitMemMB = propMem != null ?
+ Integer.parseInt(propMem) :
+ yarnConfig.getInt(YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY, DEFAULT_YARN_RM_INCREMENT_ALLOCATION_MB);
+ unitVcore = propVcore != null ?
+ Integer.parseInt(propVcore) :
+ yarnConfig.getInt(YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY, DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES);
+ } else {
+ unitMemMB = yarnConfig.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ unitVcore = yarnConfig.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ }
+
+ return Resource.newInstance(unitMemMB, unitVcore);
+ }
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
index e430e81..7c07c67 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
@@ -50,8 +50,10 @@ class WorkerSpecContainerResourceAdapter {
private final Configuration flinkConfig;
private final int minMemMB;
private final int maxMemMB;
+ private final int unitMemMB;
private final int minVcore;
private final int maxVcore;
+ private final int unitVcore;
private final Map<String, Long> externalResourceConfigs;
private final Map<WorkerResourceSpec, InternalContainerResource> workerSpecToContainerResource;
private final Map<InternalContainerResource, Set<WorkerResourceSpec>> containerResourceToWorkerSpecs;
@@ -63,12 +65,16 @@ class WorkerSpecContainerResourceAdapter {
final int minVcore,
final int maxMemMB,
final int maxVcore,
+ final int unitMemMB,
+ final int unitVcore,
final Map<String, Long> externalResourceConfigs) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.minMemMB = minMemMB;
this.minVcore = minVcore;
this.maxMemMB = maxMemMB;
this.maxVcore = maxVcore;
+ this.unitMemMB = unitMemMB;
+ this.unitVcore = unitVcore;
this.externalResourceConfigs = Preconditions.checkNotNull(externalResourceConfigs);
workerSpecToContainerResource = new HashMap<>();
containerResourceToWorkerSpecs = new HashMap<>();
@@ -122,8 +128,8 @@ class WorkerSpecContainerResourceAdapter {
final TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);
final InternalContainerResource internalContainerResource = new InternalContainerResource(
- normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), minMemMB),
- normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), minVcore),
+ normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), minMemMB, unitMemMB),
+ normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), minVcore, unitVcore),
externalResourceConfigs);
if (resourceWithinMaxAllocation(internalContainerResource)) {
@@ -141,10 +147,12 @@ class WorkerSpecContainerResourceAdapter {
}
/**
- * Normalize to the minimum integer that is greater or equal to 'value' and is positive integer multiple of 'unitValue'.
+ * Normalize to the minimum integer that is greater or equal to both 'value' and 'minValue',
+ * and is positive integer multiple of 'unitValue'.
*/
- private int normalize(final int value, final int unitValue) {
- return Math.max(MathUtils.divideRoundUp(value, unitValue), 1) * unitValue;
+ private int normalize(final int value, final int minValue, final int unitValue) {
+ int rValue = Math.max(value, minValue);
+ return Math.max(MathUtils.divideRoundUp(rValue, unitValue), 1) * unitValue;
}
private boolean resourceWithinMaxAllocation(final InternalContainerResource resource) {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 3de7937..2837312 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
@@ -171,21 +170,7 @@ public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>
this.webInterfaceUrl = webInterfaceUrl;
- this.workerSpecContainerResourceAdapter = new WorkerSpecContainerResourceAdapter(
- flinkConfig,
- yarnConfig.getInt(
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
- yarnConfig.getInt(
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES),
- yarnConfig.getInt(
- YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
- yarnConfig.getInt(
- YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
- ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
+ this.workerSpecContainerResourceAdapter = Utils.createWorkerSpecContainerResourceAdapter(flinkConfig, yarnConfig);
this.registerApplicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(log);
this.matchingStrategy = flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index a5ae14c..7c09430 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.yarn;
import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -30,6 +32,7 @@ import java.util.Collections;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
/**
@@ -37,6 +40,9 @@ import static org.junit.Assert.assertThat;
*/
public class UtilsTest extends TestLogger {
+ private static final String YARN_RM_ARBITRARY_SCHEDULER_CLAZZ =
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
+
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -58,4 +64,44 @@ public class UtilsTest extends TestLogger {
assertThat(files.count(), equalTo(0L));
}
}
+
+ @Test
+ public void testGetUnitResource() {
+ final int minMem = 64;
+ final int minVcore = 1;
+ final int incMem = 512;
+ final int incVcore = 2;
+ final int incMemLegacy = 1024;
+ final int incVcoreLegacy = 4;
+
+ YarnConfiguration yarnConfig = new YarnConfiguration();
+ yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, minMem);
+ yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, minVcore);
+ yarnConfig.setInt(Utils.YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY, incMemLegacy);
+ yarnConfig.setInt(Utils.YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY, incVcoreLegacy);
+
+ verifyUnitResourceVariousSchedulers(yarnConfig, minMem, minVcore, incMemLegacy, incVcoreLegacy);
+
+ yarnConfig.setInt(Utils.YARN_RM_INCREMENT_ALLOCATION_MB_KEY, incMem);
+ yarnConfig.setInt(Utils.YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY, incVcore);
+
+ verifyUnitResourceVariousSchedulers(yarnConfig, minMem, minVcore, incMem, incVcore);
+ }
+
+ private static void verifyUnitResourceVariousSchedulers(YarnConfiguration yarnConfig, int minMem, int minVcore, int incMem, int incVcore) {
+ yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
+ verifyUnitResource(yarnConfig, incMem, incVcore);
+
+ yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ);
+ verifyUnitResource(yarnConfig, incMem, incVcore);
+
+ yarnConfig.set(YarnConfiguration.RM_SCHEDULER, YARN_RM_ARBITRARY_SCHEDULER_CLAZZ);
+ verifyUnitResource(yarnConfig, minMem, minVcore);
+ }
+
+ private static void verifyUnitResource(YarnConfiguration yarnConfig, int expectedMem, int expectedVcore) {
+ final Resource unitResource = Utils.getUnitResource(yarnConfig);
+ assertThat(unitResource.getMemory(), is(expectedMem));
+ assertThat(unitResource.getVirtualCores(), is(expectedVcore));
+ }
}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
index 1f466f4..29fb0c9 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
@@ -53,6 +53,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE;
final int minMemMB = 100;
final int minVcore = 10;
+ final int unitMemMB = 50;
+ final int unitVcore = 5;
final WorkerSpecContainerResourceAdapter adapter =
new WorkerSpecContainerResourceAdapter(
getConfigProcessSpecEqualsWorkerSpec(),
@@ -60,15 +62,20 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
minVcore,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
+ unitMemMB,
+ unitVcore,
Collections.emptyMap());
+ // mem < minMem, vcore < minVcore, should be normalized to [minMem, minVcore]
final WorkerResourceSpec workerSpec1 = new WorkerResourceSpec.Builder()
- .setCpuCores(1.0)
- .setTaskHeapMemoryMB(10)
- .setTaskOffHeapMemoryMB(10)
- .setNetworkMemoryMB(10)
- .setManagedMemoryMB(10)
+ .setCpuCores(8.0)
+ .setTaskHeapMemoryMB(20)
+ .setTaskOffHeapMemoryMB(20)
+ .setNetworkMemoryMB(20)
+ .setManagedMemoryMB(20)
.build();
+
+ // mem = minMem, mem % unitMem = 0, vcore = minVcore, vcore % unitVcore = 0, should not be changed
final WorkerResourceSpec workerSpec2 = new WorkerResourceSpec.Builder()
.setCpuCores(10.0)
.setTaskHeapMemoryMB(25)
@@ -76,24 +83,28 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
.setNetworkMemoryMB(25)
.setManagedMemoryMB(25)
.build();
+
+ // mem > minMem, mem % unitMem != 0, vcore < minVcore, should be normalized to [n * unitMem, minVcore]
final WorkerResourceSpec workerSpec3 = new WorkerResourceSpec.Builder()
- .setCpuCores(5.0)
+ .setCpuCores(8.0)
.setTaskHeapMemoryMB(30)
.setTaskOffHeapMemoryMB(30)
.setNetworkMemoryMB(30)
.setManagedMemoryMB(30)
.build();
+
+ // mem < minMem, vcore > minVcore, vcore % unitVcore != 0, should be normalized to [minMem, n * unitVcore]
final WorkerResourceSpec workerSpec4 = new WorkerResourceSpec.Builder()
- .setCpuCores(15.0)
- .setTaskHeapMemoryMB(10)
- .setTaskOffHeapMemoryMB(10)
- .setNetworkMemoryMB(10)
- .setManagedMemoryMB(10)
+ .setCpuCores(12.0)
+ .setTaskHeapMemoryMB(20)
+ .setTaskOffHeapMemoryMB(20)
+ .setNetworkMemoryMB(20)
+ .setManagedMemoryMB(20)
.build();
final Resource containerResource1 = Resource.newInstance(100, 10);
- final Resource containerResource2 = Resource.newInstance(200, 10);
- final Resource containerResource3 = Resource.newInstance(100, 20);
+ final Resource containerResource2 = Resource.newInstance(150, 10);
+ final Resource containerResource3 = Resource.newInstance(100, 15);
assertThat(adapter.getWorkerSpecs(containerResource1, strategy), empty());
assertThat(adapter.getWorkerSpecs(containerResource2, strategy), empty());
@@ -114,6 +125,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
final int minMemMB = 100;
final int minVcore = 1;
+ final int unitMemMB = 50;
+ final int unitVcore = 1;
final WorkerSpecContainerResourceAdapter adapter =
new WorkerSpecContainerResourceAdapter(
getConfigProcessSpecEqualsWorkerSpec(),
@@ -121,8 +134,11 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
minVcore,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
+ unitMemMB,
+ unitVcore,
Collections.emptyMap());
+ // mem < minMem, should be normalized to [minMem, vcore], equivalent to [minMem, 1]
final WorkerResourceSpec workerSpec1 = new WorkerResourceSpec.Builder()
.setCpuCores(5.0)
.setTaskHeapMemoryMB(10)
@@ -130,6 +146,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
.setNetworkMemoryMB(10)
.setManagedMemoryMB(10)
.build();
+
+ // mem < minMem, should be normalized to [minMem, vcore], equivalent to [minMem, 1]
final WorkerResourceSpec workerSpec2 = new WorkerResourceSpec.Builder()
.setCpuCores(10.0)
.setTaskHeapMemoryMB(10)
@@ -137,6 +155,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
.setNetworkMemoryMB(10)
.setManagedMemoryMB(10)
.build();
+
+ // mem = minMem, mem % unitMem = 0, should not be changed, equivalent to [mem, 1]
final WorkerResourceSpec workerSpec3 = new WorkerResourceSpec.Builder()
.setCpuCores(5.0)
.setTaskHeapMemoryMB(25)
@@ -144,6 +164,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
.setNetworkMemoryMB(25)
.setManagedMemoryMB(25)
.build();
+
+ // mem > minMem, mem % unitMem != 0, should be normalized to [n * unitMem, vcore], equivalent to [n * unitMem, 1]
final WorkerResourceSpec workerSpec4 = new WorkerResourceSpec.Builder()
.setCpuCores(5.0)
.setTaskHeapMemoryMB(30)
@@ -154,10 +176,10 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
final Resource containerResource1 = Resource.newInstance(100, 5);
final Resource containerResource2 = Resource.newInstance(100, 10);
- final Resource containerResource3 = Resource.newInstance(200, 5);
+ final Resource containerResource3 = Resource.newInstance(150, 5);
final Resource containerResource4 = Resource.newInstance(100, 1);
- final Resource containerResource5 = Resource.newInstance(200, 1);
+ final Resource containerResource5 = Resource.newInstance(150, 1);
assertThat(adapter.tryComputeContainerResource(workerSpec1).get(), is(containerResource1));
assertThat(adapter.tryComputeContainerResource(workerSpec2).get(), is(containerResource2));
@@ -177,6 +199,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
final int minVcore = 1;
final int maxMemMB = 1000;
final int maxVcore = 10;
+ final int unitMemMB = 100;
+ final int unitVcore = 1;
final WorkerSpecContainerResourceAdapter adapter =
new WorkerSpecContainerResourceAdapter(
getConfigProcessSpecEqualsWorkerSpec(),
@@ -184,6 +208,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
minVcore,
maxMemMB,
maxVcore,
+ unitMemMB,
+ unitVcore,
Collections.emptyMap());
final WorkerResourceSpec workerSpec1 = new WorkerResourceSpec.Builder()
@@ -211,6 +237,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
final int minMemMB = 1;
final int minVcore = 1;
+ final int unitMemMB = 1;
+ final int unitVcore = 1;
final WorkerSpecContainerResourceAdapter adapter =
new WorkerSpecContainerResourceAdapter(
@@ -219,6 +247,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
minVcore,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
+ unitMemMB,
+ unitVcore,
Collections.emptyMap());
final WorkerResourceSpec workerSpec = new WorkerResourceSpec.Builder()