You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2020/09/10 11:14:20 UTC

[flink] branch release-1.11 updated: [FLINK-19151][yarn] Update container resource normalization algorithm, with respect to Yarn FairScheduler.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new bfff6b1  [FLINK-19151][yarn] Update container resource normalization algorithm, with respect to Yarn FairScheduler.
bfff6b1 is described below

commit bfff6b15ec7dd3a4415f6a5a9d8535ea7960e474
Author: jinhai <ji...@gmail.com>
AuthorDate: Mon Sep 7 20:57:26 2020 +0800

    [FLINK-19151][yarn] Update container resource normalization algorithm, with respect to Yarn FairScheduler.
    
    This closes #13347.
---
 .../src/main/java/org/apache/flink/yarn/Utils.java | 73 ++++++++++++++++++++++
 .../yarn/WorkerSpecContainerResourceAdapter.java   | 18 ++++--
 .../org/apache/flink/yarn/YarnResourceManager.java | 17 +----
 .../test/java/org/apache/flink/yarn/UtilsTest.java | 46 ++++++++++++++
 .../WorkerSpecContainerResourceAdapterTest.java    | 60 +++++++++++++-----
 5 files changed, 178 insertions(+), 36 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 54dd063..a28f55b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
 import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.StringUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -59,6 +63,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;
@@ -79,6 +84,21 @@ public final class Utils {
 	/** Yarn site xml file name populated in YARN container for secure IT run. */
 	public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
 
+	@VisibleForTesting
+	static final String YARN_RM_FAIR_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler";
+	@VisibleForTesting
+	static final String YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler";
+	@VisibleForTesting
+	static final String YARN_RM_INCREMENT_ALLOCATION_MB_KEY = "yarn.resource-types.memory-mb.increment-allocation";
+	@VisibleForTesting
+	static final String YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY = "yarn.scheduler.increment-allocation-mb";
+	private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_MB = 1024;
+	@VisibleForTesting
+	static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY = "yarn.resource-types.vcores.increment-allocation";
+	@VisibleForTesting
+	static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY = "yarn.scheduler.increment-allocation-vcores";
+	private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES = 1;
+
 	public static void setupYarnClassPath(Configuration conf, Map<String, String> appMasterEnv) {
 		addToEnvironment(
 			appMasterEnv,
@@ -520,4 +540,57 @@ public final class Utils {
 			throw new RuntimeException(String.format(message, values));
 		}
 	}
+
+	public static WorkerSpecContainerResourceAdapter createWorkerSpecContainerResourceAdapter(
+			org.apache.flink.configuration.Configuration flinkConfig,
+			YarnConfiguration yarnConfig) {
+
+		Resource unitResource = getUnitResource(yarnConfig);
+
+		return new WorkerSpecContainerResourceAdapter(
+			flinkConfig,
+			yarnConfig.getInt(
+				YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+				YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
+			yarnConfig.getInt(
+				YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+				YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES),
+			yarnConfig.getInt(
+				YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+				YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
+			yarnConfig.getInt(
+				YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+				YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
+			unitResource.getMemory(),
+			unitResource.getVirtualCores(),
+			ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
+	}
+
+	@VisibleForTesting
+	static Resource getUnitResource(YarnConfiguration yarnConfig) {
+		final int unitMemMB, unitVcore;
+
+		final String yarnRmSchedulerClazzName = yarnConfig.get(YarnConfiguration.RM_SCHEDULER);
+		if (Objects.equals(yarnRmSchedulerClazzName, YARN_RM_FAIR_SCHEDULER_CLAZZ) ||
+				Objects.equals(yarnRmSchedulerClazzName, YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ)) {
+			String propMem = yarnConfig.get(YARN_RM_INCREMENT_ALLOCATION_MB_KEY);
+			String propVcore = yarnConfig.get(YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY);
+
+			unitMemMB = propMem != null ?
+					Integer.parseInt(propMem) :
+					yarnConfig.getInt(YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY, DEFAULT_YARN_RM_INCREMENT_ALLOCATION_MB);
+			unitVcore = propVcore != null ?
+					Integer.parseInt(propVcore) :
+					yarnConfig.getInt(YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY, DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES);
+		} else {
+			unitMemMB = yarnConfig.getInt(
+					YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+					YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+			unitVcore = yarnConfig.getInt(
+					YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+					YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+		}
+
+		return Resource.newInstance(unitMemMB, unitVcore);
+	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
index e430e81..7c07c67 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
@@ -50,8 +50,10 @@ class WorkerSpecContainerResourceAdapter {
 	private final Configuration flinkConfig;
 	private final int minMemMB;
 	private final int maxMemMB;
+	private final int unitMemMB;
 	private final int minVcore;
 	private final int maxVcore;
+	private final int unitVcore;
 	private final Map<String, Long> externalResourceConfigs;
 	private final Map<WorkerResourceSpec, InternalContainerResource> workerSpecToContainerResource;
 	private final Map<InternalContainerResource, Set<WorkerResourceSpec>> containerResourceToWorkerSpecs;
@@ -63,12 +65,16 @@ class WorkerSpecContainerResourceAdapter {
 		final int minVcore,
 		final int maxMemMB,
 		final int maxVcore,
+		final int unitMemMB,
+		final int unitVcore,
 		final Map<String, Long> externalResourceConfigs) {
 		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
 		this.minMemMB = minMemMB;
 		this.minVcore = minVcore;
 		this.maxMemMB = maxMemMB;
 		this.maxVcore = maxVcore;
+		this.unitMemMB = unitMemMB;
+		this.unitVcore = unitVcore;
 		this.externalResourceConfigs = Preconditions.checkNotNull(externalResourceConfigs);
 		workerSpecToContainerResource = new HashMap<>();
 		containerResourceToWorkerSpecs = new HashMap<>();
@@ -122,8 +128,8 @@ class WorkerSpecContainerResourceAdapter {
 		final TaskExecutorProcessSpec taskExecutorProcessSpec =
 			TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);
 		final InternalContainerResource internalContainerResource = new InternalContainerResource(
-			normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), minMemMB),
-			normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), minVcore),
+			normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), minMemMB, unitMemMB),
+			normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), minVcore, unitVcore),
 			externalResourceConfigs);
 
 		if (resourceWithinMaxAllocation(internalContainerResource)) {
@@ -141,10 +147,12 @@ class WorkerSpecContainerResourceAdapter {
 	}
 
 	/**
-	 * Normalize to the minimum integer that is greater or equal to 'value' and is positive integer multiple of 'unitValue'.
+	 * Normalize to the minimum integer that is greater or equal to both 'value' and 'minValue',
+	 * and is positive integer multiple of 'unitValue'.
 	 */
-	private int normalize(final int value, final int unitValue) {
-		return Math.max(MathUtils.divideRoundUp(value, unitValue), 1) * unitValue;
+	private int normalize(final int value, final int minValue, final int unitValue) {
+		int rValue = Math.max(value, minValue);
+		return Math.max(MathUtils.divideRoundUp(rValue, unitValue), 1) * unitValue;
 	}
 
 	private boolean resourceWithinMaxAllocation(final InternalContainerResource resource) {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 3de7937..2837312 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
@@ -171,21 +170,7 @@ public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>
 
 		this.webInterfaceUrl = webInterfaceUrl;
 
-		this.workerSpecContainerResourceAdapter = new WorkerSpecContainerResourceAdapter(
-			flinkConfig,
-			yarnConfig.getInt(
-				YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-				YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
-			yarnConfig.getInt(
-				YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
-				YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES),
-			yarnConfig.getInt(
-				YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-				YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
-			yarnConfig.getInt(
-				YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-				YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
-			ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
+		this.workerSpecContainerResourceAdapter = Utils.createWorkerSpecContainerResourceAdapter(flinkConfig, yarnConfig);
 		this.registerApplicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(log);
 
 		this.matchingStrategy = flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index a5ae14c..7c09430 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.util.TestLogger;
 
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -30,6 +32,7 @@ import java.util.Collections;
 import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -37,6 +40,9 @@ import static org.junit.Assert.assertThat;
  */
 public class UtilsTest extends TestLogger {
 
+	private static final String YARN_RM_ARBITRARY_SCHEDULER_CLAZZ =
+			"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
+
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@@ -58,4 +64,44 @@ public class UtilsTest extends TestLogger {
 			assertThat(files.count(), equalTo(0L));
 		}
 	}
+
+	@Test
+	public void testGetUnitResource() {
+		final int minMem = 64;
+		final int minVcore = 1;
+		final int incMem = 512;
+		final int incVcore = 2;
+		final int incMemLegacy = 1024;
+		final int incVcoreLegacy = 4;
+
+		YarnConfiguration yarnConfig = new YarnConfiguration();
+		yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, minMem);
+		yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, minVcore);
+		yarnConfig.setInt(Utils.YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY, incMemLegacy);
+		yarnConfig.setInt(Utils.YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY, incVcoreLegacy);
+
+		verifyUnitResourceVariousSchedulers(yarnConfig, minMem, minVcore, incMemLegacy, incVcoreLegacy);
+
+		yarnConfig.setInt(Utils.YARN_RM_INCREMENT_ALLOCATION_MB_KEY, incMem);
+		yarnConfig.setInt(Utils.YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY, incVcore);
+
+		verifyUnitResourceVariousSchedulers(yarnConfig, minMem, minVcore, incMem, incVcore);
+	}
+
+	private static void verifyUnitResourceVariousSchedulers(YarnConfiguration yarnConfig, int minMem, int minVcore, int incMem, int incVcore) {
+		yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
+		verifyUnitResource(yarnConfig, incMem, incVcore);
+
+		yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ);
+		verifyUnitResource(yarnConfig, incMem, incVcore);
+
+		yarnConfig.set(YarnConfiguration.RM_SCHEDULER, YARN_RM_ARBITRARY_SCHEDULER_CLAZZ);
+		verifyUnitResource(yarnConfig, minMem, minVcore);
+	}
+
+	private static void verifyUnitResource(YarnConfiguration yarnConfig, int expectedMem, int expectedVcore) {
+		final Resource unitResource = Utils.getUnitResource(yarnConfig);
+		assertThat(unitResource.getMemory(), is(expectedMem));
+		assertThat(unitResource.getVirtualCores(), is(expectedVcore));
+	}
 }
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
index 1f466f4..29fb0c9 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
@@ -53,6 +53,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 			WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE;
 		final int minMemMB = 100;
 		final int minVcore = 10;
+		final int unitMemMB = 50;
+		final int unitVcore = 5;
 		final WorkerSpecContainerResourceAdapter adapter =
 			new WorkerSpecContainerResourceAdapter(
 				getConfigProcessSpecEqualsWorkerSpec(),
@@ -60,15 +62,20 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 				minVcore,
 				Integer.MAX_VALUE,
 				Integer.MAX_VALUE,
+				unitMemMB,
+				unitVcore,
 				Collections.emptyMap());
 
+		// mem < minMem, vcore < minVcore, should be normalized to [minMem, minVcore]
 		final WorkerResourceSpec workerSpec1 = new WorkerResourceSpec.Builder()
-			.setCpuCores(1.0)
-			.setTaskHeapMemoryMB(10)
-			.setTaskOffHeapMemoryMB(10)
-			.setNetworkMemoryMB(10)
-			.setManagedMemoryMB(10)
+			.setCpuCores(8.0)
+			.setTaskHeapMemoryMB(20)
+			.setTaskOffHeapMemoryMB(20)
+			.setNetworkMemoryMB(20)
+			.setManagedMemoryMB(20)
 			.build();
+
+		// mem = minMem, mem % unitMem = 0, vcore = minVcore, vcore % unitVcore = 0, should not be changed
 		final WorkerResourceSpec workerSpec2 = new WorkerResourceSpec.Builder()
 			.setCpuCores(10.0)
 			.setTaskHeapMemoryMB(25)
@@ -76,24 +83,28 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 			.setNetworkMemoryMB(25)
 			.setManagedMemoryMB(25)
 			.build();
+
+		// mem > minMem, mem % unitMem != 0, vcore < minVcore, should be normalized to [n * unitMem, minVcore]
 		final WorkerResourceSpec workerSpec3 = new WorkerResourceSpec.Builder()
-			.setCpuCores(5.0)
+			.setCpuCores(8.0)
 			.setTaskHeapMemoryMB(30)
 			.setTaskOffHeapMemoryMB(30)
 			.setNetworkMemoryMB(30)
 			.setManagedMemoryMB(30)
 			.build();
+
+		// mem < minMem, vcore > minVcore, vcore % unitVcore != 0, should be normalized to [minMem, n * unitVcore]
 		final WorkerResourceSpec workerSpec4 = new WorkerResourceSpec.Builder()
-			.setCpuCores(15.0)
-			.setTaskHeapMemoryMB(10)
-			.setTaskOffHeapMemoryMB(10)
-			.setNetworkMemoryMB(10)
-			.setManagedMemoryMB(10)
+			.setCpuCores(12.0)
+			.setTaskHeapMemoryMB(20)
+			.setTaskOffHeapMemoryMB(20)
+			.setNetworkMemoryMB(20)
+			.setManagedMemoryMB(20)
 			.build();
 
 		final Resource containerResource1 = Resource.newInstance(100, 10);
-		final Resource containerResource2 = Resource.newInstance(200, 10);
-		final Resource containerResource3 = Resource.newInstance(100, 20);
+		final Resource containerResource2 = Resource.newInstance(150, 10);
+		final Resource containerResource3 = Resource.newInstance(100, 15);
 
 		assertThat(adapter.getWorkerSpecs(containerResource1, strategy), empty());
 		assertThat(adapter.getWorkerSpecs(containerResource2, strategy), empty());
@@ -114,6 +125,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 			WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
 		final int minMemMB = 100;
 		final int minVcore = 1;
+		final int unitMemMB = 50;
+		final int unitVcore = 1;
 		final WorkerSpecContainerResourceAdapter adapter =
 			new WorkerSpecContainerResourceAdapter(
 				getConfigProcessSpecEqualsWorkerSpec(),
@@ -121,8 +134,11 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 				minVcore,
 				Integer.MAX_VALUE,
 				Integer.MAX_VALUE,
+				unitMemMB,
+				unitVcore,
 				Collections.emptyMap());
 
+		// mem < minMem, should be normalized to [minMem, vcore], equivalent to [minMem, 1]
 		final WorkerResourceSpec workerSpec1 = new WorkerResourceSpec.Builder()
 			.setCpuCores(5.0)
 			.setTaskHeapMemoryMB(10)
@@ -130,6 +146,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 			.setNetworkMemoryMB(10)
 			.setManagedMemoryMB(10)
 			.build();
+
+		// mem < minMem, should be normalized to [minMem, vcore], equivalent to [minMem, 1]
 		final WorkerResourceSpec workerSpec2 = new WorkerResourceSpec.Builder()
 			.setCpuCores(10.0)
 			.setTaskHeapMemoryMB(10)
@@ -137,6 +155,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 			.setNetworkMemoryMB(10)
 			.setManagedMemoryMB(10)
 			.build();
+
+		// mem = minMem, mem % unitMem = 0, should not be changed, equivalent to [mem, 1]
 		final WorkerResourceSpec workerSpec3 = new WorkerResourceSpec.Builder()
 			.setCpuCores(5.0)
 			.setTaskHeapMemoryMB(25)
@@ -144,6 +164,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 			.setNetworkMemoryMB(25)
 			.setManagedMemoryMB(25)
 			.build();
+
+		// mem > minMem, mem % unitMem != 0, should be normalized to [n * unitMem, vcore], equivalent to [n * unitMem, 1]
 		final WorkerResourceSpec workerSpec4 = new WorkerResourceSpec.Builder()
 			.setCpuCores(5.0)
 			.setTaskHeapMemoryMB(30)
@@ -154,10 +176,10 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 
 		final Resource containerResource1 = Resource.newInstance(100, 5);
 		final Resource containerResource2 = Resource.newInstance(100, 10);
-		final Resource containerResource3 = Resource.newInstance(200, 5);
+		final Resource containerResource3 = Resource.newInstance(150, 5);
 
 		final Resource containerResource4 = Resource.newInstance(100, 1);
-		final Resource containerResource5 = Resource.newInstance(200, 1);
+		final Resource containerResource5 = Resource.newInstance(150, 1);
 
 		assertThat(adapter.tryComputeContainerResource(workerSpec1).get(), is(containerResource1));
 		assertThat(adapter.tryComputeContainerResource(workerSpec2).get(), is(containerResource2));
@@ -177,6 +199,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 		final int minVcore = 1;
 		final int maxMemMB = 1000;
 		final int maxVcore = 10;
+		final int unitMemMB = 100;
+		final int unitVcore = 1;
 		final WorkerSpecContainerResourceAdapter adapter =
 			new WorkerSpecContainerResourceAdapter(
 				getConfigProcessSpecEqualsWorkerSpec(),
@@ -184,6 +208,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 				minVcore,
 				maxMemMB,
 				maxVcore,
+				unitMemMB,
+				unitVcore,
 				Collections.emptyMap());
 
 		final WorkerResourceSpec workerSpec1 = new WorkerResourceSpec.Builder()
@@ -211,6 +237,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 			WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
 		final int minMemMB = 1;
 		final int minVcore = 1;
+		final int unitMemMB = 1;
+		final int unitVcore = 1;
 
 		final WorkerSpecContainerResourceAdapter adapter =
 			new WorkerSpecContainerResourceAdapter(
@@ -219,6 +247,8 @@ public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
 				minVcore,
 				Integer.MAX_VALUE,
 				Integer.MAX_VALUE,
+				unitMemMB,
+				unitVcore,
 				Collections.emptyMap());
 
 		final WorkerResourceSpec workerSpec = new WorkerResourceSpec.Builder()