You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2019/11/15 00:24:01 UTC

[hadoop] branch branch-2 updated (d1b91ed -> 235d6c7)

This is an automated email from the ASF dual-hosted git repository.

jhung pushed a change to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from d1b91ed  HDFS-14958. TestBalancerWithNodeGroup is not using NetworkTopologyWithNodeGroup. Contributed by Jim Brennan.
     new b5128b2a YARN-7739. DefaultAMSProcessor should properly check customized resource types against minimum/maximum allocation. (wangda)
     new 7f1d2e1  YARN-7541. Node updates don't update the maximum cluster capability for resources other than CPU and memory
     new 235d6c7  YARN-8202. DefaultAMSProcessor should properly check units of requested custom resource types against minimum/maximum allocation (snemeth via rkanter)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../v2/app/rm/ContainerRequestCreator.java         |  57 +++
 .../v2/app/rm/TestRMContainerAllocator.java        | 533 +++++++++++----------
 .../apache/hadoop/yarn/api/records/Resource.java   |  23 +-
 .../hadoop/yarn/util/UnitsConversionUtil.java      |  44 +-
 .../hadoop/yarn/util/resource/ResourceUtils.java   |  29 ++
 .../resourcetypes/ResourceTypesTestHelper.java     |  93 ++++
 .../hadoop/yarn/server/utils/BuilderUtils.java     |   8 +-
 .../scheduler/ClusterNodeTracker.java              |  75 ++-
 .../resourcemanager/scheduler/SchedulerUtils.java  |  97 +++-
 .../yarn/server/resourcemanager/MockNodes.java     |   4 +
 .../TestApplicationMasterService.java              | 292 ++++++++++-
 .../scheduler/TestClusterNodeTracker.java          | 125 ++++-
 .../scheduler/TestSchedulerUtils.java              | 279 ++++++++++-
 .../scheduler/capacity/TestUtils.java              |   8 +-
 14 files changed, 1341 insertions(+), 326 deletions(-)
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java
 create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/03: YARN-7739. DefaultAMSProcessor should properly check customized resource types against minimum/maximum allocation. (wangda)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit b5128b2ae26588f1f5e293c29ca12228e0c464f5
Author: Wangda Tan <wa...@apache.org>
AuthorDate: Mon Feb 12 10:29:37 2018 +0800

    YARN-7739. DefaultAMSProcessor should properly check customized resource types against minimum/maximum allocation. (wangda)
    
    Change-Id: I10cc9341237d9a2fc0f8c855efb98a36b91389e2
    (cherry picked from commit d02e42cee4a08a47ed2835f7a4a100daaa95833f)
---
 .../resourcemanager/scheduler/SchedulerUtils.java  |  36 ++--
 .../TestApplicationMasterService.java              | 182 +++++++++++++++++++++
 .../scheduler/capacity/TestUtils.java              |   8 +-
 3 files changed, 206 insertions(+), 20 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index f8fe979..4d6726d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
@@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -289,23 +291,23 @@ public class SchedulerUtils {
   private static void validateResourceRequest(ResourceRequest resReq,
       Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
       throws InvalidResourceRequestException {
-    if (resReq.getCapability().getMemorySize() < 0 ||
-        resReq.getCapability().getMemorySize() > maximumResource.getMemorySize()) {
-      throw new InvalidResourceRequestException("Invalid resource request"
-          + ", requested memory < 0"
-          + ", or requested memory > max configured"
-          + ", requestedMemory=" + resReq.getCapability().getMemorySize()
-          + ", maxMemory=" + maximumResource.getMemorySize());
-    }
-    if (resReq.getCapability().getVirtualCores() < 0 ||
-        resReq.getCapability().getVirtualCores() >
-        maximumResource.getVirtualCores()) {
-      throw new InvalidResourceRequestException("Invalid resource request"
-          + ", requested virtual cores < 0"
-          + ", or requested virtual cores > max configured"
-          + ", requestedVirtualCores="
-          + resReq.getCapability().getVirtualCores()
-          + ", maxVirtualCores=" + maximumResource.getVirtualCores());
+    Resource requestedResource = resReq.getCapability();
+    for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
+      ResourceInformation reqRI = requestedResource.getResourceInformation(i);
+      ResourceInformation maxRI = maximumResource.getResourceInformation(i);
+      if (reqRI.getValue() < 0 || reqRI.getValue() > maxRI.getValue()) {
+        throw new InvalidResourceRequestException(
+            "Invalid resource request, requested resource type=[" + reqRI
+                .getName()
+                + "] < 0 or greater than maximum allowed allocation. Requested "
+                + "resource=" + requestedResource
+                + ", maximum allowed allocation=" + maximumResource
+                + ", please note that maximum allowed allocation is calculated "
+                + "by scheduler based on maximum resource of registered "
+                + "NodeManagers, which might be less than configured "
+                + "maximum allocation=" + ResourceUtils
+                .getResourceTypesMaximumAllocation());
+      }
     }
     String labelExp = resReq.getNodeLabelExpression();
     // we don't allow specify label expression other than resourceName=ANY now
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index aa0085b..ff976d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import static java.lang.Thread.sleep;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -29,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
 import org.apache.hadoop.yarn.api.protocolrecords
     .RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -48,11 +52,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -62,13 +69,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
@@ -665,6 +677,176 @@ public class TestApplicationMasterService {
     rm.stop();
   }
 
+  @Test(timeout = 300000)
+  public void testCSValidateRequestCapacityAgainstMinMaxAllocation()
+      throws Exception {
+    testValidateRequestCapacityAgainstMinMaxAllocation(CapacityScheduler.class);
+  }
+
+  @Test(timeout = 300000)
+  public void testFSValidateRequestCapacityAgainstMinMaxAllocation()
+      throws Exception {
+    testValidateRequestCapacityAgainstMinMaxAllocation(FairScheduler.class);
+  }
+
+  private void testValidateRequestCapacityAgainstMinMaxAllocation(Class<?> schedulerCls)
+      throws Exception {
+
+    // Initialize resource map for 2 types.
+    Map<String, ResourceInformation> riMap = new HashMap<>();
+
+    // Initialize mandatory resources
+    ResourceInformation memory = ResourceInformation.newInstance(
+        ResourceInformation.MEMORY_MB.getName(),
+        ResourceInformation.MEMORY_MB.getUnits(),
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    ResourceInformation vcores = ResourceInformation.newInstance(
+        ResourceInformation.VCORES.getName(),
+        ResourceInformation.VCORES.getUnits(),
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    riMap.put(ResourceInformation.MEMORY_URI, memory);
+    riMap.put(ResourceInformation.VCORES_URI, vcores);
+
+    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+    CapacitySchedulerConfiguration csconf =
+        new CapacitySchedulerConfiguration();
+    csconf.setResourceComparator(DominantResourceCalculator.class);
+
+    YarnConfiguration conf = new YarnConfiguration(csconf);
+    // Don't reset resource types since we have already configured resource
+    // types
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerCls,
+        ResourceScheduler.class);
+
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils
+        .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+            DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null));
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+    // Now request resource, memory > allowed
+    boolean exception = false;
+    try {
+      am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
+          Resource.newInstance(9 * GB, 1)).numContainers(1).resourceName("*")
+          .build()), null);
+    } catch (InvalidResourceRequestException e) {
+      exception = true;
+    }
+    Assert.assertTrue(exception);
+
+    exception = false;
+    try {
+      // Now request resource, vcore > allowed
+      am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
+          Resource.newInstance(8 * GB, 18)).numContainers(1).resourceName("*")
+          .build()), null);
+    } catch (InvalidResourceRequestException e) {
+      exception = true;
+    }
+    Assert.assertTrue(exception);
+
+    rm.close();
+  }
+
+  @Test(timeout = 300000)
+  public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypes()
+      throws Exception {
+
+    // Initialize resource map for 2 types.
+    Map<String, ResourceInformation> riMap = new HashMap<>();
+
+    // Initialize mandatory resources
+    ResourceInformation memory = ResourceInformation.newInstance(
+        ResourceInformation.MEMORY_MB.getName(),
+        ResourceInformation.MEMORY_MB.getUnits(),
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    ResourceInformation vcores = ResourceInformation.newInstance(
+        ResourceInformation.VCORES.getName(),
+        ResourceInformation.VCORES.getUnits(),
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    ResourceInformation res_1 = ResourceInformation.newInstance("res_1",
+        ResourceInformation.VCORES.getUnits(), 0, 4);
+    riMap.put(ResourceInformation.MEMORY_URI, memory);
+    riMap.put(ResourceInformation.VCORES_URI, vcores);
+    riMap.put("res_1", res_1);
+
+    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+    CapacitySchedulerConfiguration csconf =
+        new CapacitySchedulerConfiguration();
+    csconf.setResourceComparator(DominantResourceCalculator.class);
+
+    YarnConfiguration conf = new YarnConfiguration(csconf);
+    // Don't reset resource types since we have already configured resource
+    // types
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
+
+    MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils
+        .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+            DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+            ImmutableMap.of("res_1", 4)));
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+    Assert.assertEquals(Resource.newInstance(1 * GB, 1),
+        leafQueue.getUsedResources());
+
+    // Now request resource, memory > allowed
+    boolean exception = false;
+    try {
+      am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
+          TestUtils.createResource(9 * GB, 1, ImmutableMap.of("res_1", 1)))
+          .numContainers(1).resourceName("*").build()), null);
+    } catch (InvalidResourceRequestException e) {
+      exception = true;
+    }
+    Assert.assertTrue(exception);
+
+    exception = false;
+    try {
+      // Now request resource, vcore > allowed
+      am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
+          TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1)))
+          .numContainers(1).resourceName("*")
+          .build()), null);
+    } catch (InvalidResourceRequestException e) {
+      exception = true;
+    }
+    Assert.assertTrue(exception);
+
+    exception = false;
+    try {
+      // Now request resource, res_1 > allowed
+      am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
+          TestUtils.createResource(8 * GB, 1, ImmutableMap.of("res_1", 100)))
+          .numContainers(1).resourceName("*")
+          .build()), null);
+    } catch (InvalidResourceRequestException e) {
+      exception = true;
+    }
+    Assert.assertTrue(exception);
+
+    rm.close();
+  }
+
   private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
     RMContainer rmContainer = cs.getRMContainer(containerId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index cb150e0..1c1e112 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -467,9 +467,11 @@ public class TestUtils {
   public static Resource createResource(long memory, int vcores,
       Map<String, Integer> nameToValues) {
     Resource res = Resource.newInstance(memory, vcores);
-    for (Map.Entry<String, Integer> entry : nameToValues.entrySet()) {
-      res.setResourceInformation(entry.getKey(), ResourceInformation
-          .newInstance(entry.getKey(), "", entry.getValue()));
+    if (nameToValues != null) {
+      for (Map.Entry<String, Integer> entry : nameToValues.entrySet()) {
+        res.setResourceInformation(entry.getKey(), ResourceInformation
+            .newInstance(entry.getKey(), "", entry.getValue()));
+      }
     }
     return res;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/03: YARN-7541. Node updates don't update the maximum cluster capability for resources other than CPU and memory

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 7f1d2e196c2a63f0353f50668efe24c1c8116b9e
Author: Daniel Templeton <te...@apache.org>
AuthorDate: Wed Nov 29 10:36:19 2017 -0800

    YARN-7541. Node updates don't update the maximum cluster capability for resources other than CPU and memory
    
    (cherry picked from commit 8498d287cd3beddcf8fe19625227e09982ec4be2)
---
 .../apache/hadoop/yarn/api/records/Resource.java   |  23 +++-
 .../hadoop/yarn/util/resource/ResourceUtils.java   |  29 +++++
 .../scheduler/ClusterNodeTracker.java              |  75 +++++++++----
 .../yarn/server/resourcemanager/MockNodes.java     |   4 +
 .../scheduler/TestClusterNodeTracker.java          | 125 ++++++++++++++++++++-
 5 files changed, 228 insertions(+), 28 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
index ce3ea13..7182da5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
@@ -21,9 +21,8 @@ package org.apache.hadoop.yarn.api.records;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -85,6 +84,26 @@ public abstract class Resource implements Comparable<Resource> {
     return new LightWeightResource(memory, vCores);
   }
 
+  /**
+   * Create a new {@link Resource} instance with the given CPU and memory
+   * values and additional resource values as set in the {@code others}
+   * parameter. Note that the CPU and memory settings in the {@code others}
+   * parameter will be ignored.
+   *
+   * @param memory the memory value
+   * @param vCores the CPU value
+   * @param others a map of other resource values indexed by resource name
+   * @return a {@link Resource} instance with the given resource values
+   */
+  @Public
+  @Stable
+  public static Resource newInstance(long memory, int vCores,
+      Map<String, Long> others) {
+    ResourceInformation[] info = ResourceUtils.createResourceTypesArray(others);
+
+    return new LightWeightResource(memory, vCores, info);
+  }
+
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   public static Resource newInstance(Resource resource) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
index 4c65cae..0b909a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
@@ -619,4 +619,33 @@ public class ResourceUtils {
     return result;
   }
 
+  /**
+   * Create an array of {@link ResourceInformation} objects corresponding to
+   * the passed in map of names to values. The array will be ordered according
+   * to the order returned by {@link #getResourceTypesArray()}. The value of
+   * each resource type in the returned array will either be the value given for
+   * that resource in the {@code res} parameter or, if none is given, 0.
+   *
+   * @param res the map of resource type values
+   * @return an array of {@link ResourceInformation} instances
+   */
+  public static ResourceInformation[] createResourceTypesArray(Map<String,
+      Long> res) {
+    ResourceInformation[] info = new ResourceInformation[resourceTypes.size()];
+
+    for (Entry<String, Integer> entry : RESOURCE_NAME_TO_INDEX.entrySet()) {
+      int index = entry.getValue();
+      Long value = res.get(entry.getKey());
+
+      if (value == null) {
+        value = 0L;
+      }
+
+      info[index] = new ResourceInformation();
+      ResourceInformation.copy(resourceTypesArray[index], info[index]);
+      info[index].setValue(value);
+    }
+
+    return info;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 07b5f5c..4cba902 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -25,11 +25,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -60,11 +63,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
   private Resource staleClusterCapacity = null;
 
   // Max allocation
-  private long maxNodeMemory = -1;
-  private int maxNodeVCores = -1;
+  private final long[] maxAllocation;
   private Resource configuredMaxAllocation;
   private boolean forceConfiguredMaxAllocation = true;
   private long configuredMaxAllocationWaitTime;
+  private boolean reportedMaxAllocation = false;
+
+  public ClusterNodeTracker() {
+    maxAllocation = new long[ResourceUtils.getNumberOfKnownResourceTypes()];
+    Arrays.fill(maxAllocation, -1);
+  }
 
   public void addNode(N node) {
     writeLock.lock();
@@ -215,17 +223,18 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
         forceConfiguredMaxAllocation = false;
       }
 
-      if (forceConfiguredMaxAllocation
-          || maxNodeMemory == -1 || maxNodeVCores == -1) {
+      if (forceConfiguredMaxAllocation || !reportedMaxAllocation) {
         return configuredMaxAllocation;
       }
 
       Resource ret = Resources.clone(configuredMaxAllocation);
-      if (ret.getMemorySize() > maxNodeMemory) {
-        ret.setMemorySize(maxNodeMemory);
-      }
-      if (ret.getVirtualCores() > maxNodeVCores) {
-        ret.setVirtualCores(maxNodeVCores);
+
+      for (int i = 0; i < maxAllocation.length; i++) {
+        ResourceInformation info = ret.getResourceInformation(i);
+
+        if (info.getValue() > maxAllocation[i]) {
+          info.setValue(maxAllocation[i]);
+        }
       }
 
       return ret;
@@ -246,28 +255,50 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
 
   private void updateMaxResources(SchedulerNode node, boolean add) {
     Resource totalResource = node.getTotalResource();
+    ResourceInformation[] totalResources;
+
+    if (totalResource != null) {
+      totalResources = totalResource.getResources();
+    } else {
+      LOG.warn(node.getNodeName() + " reported in with null resources, which "
+          + "indicates a problem in the source code. Please file an issue at "
+          + "https://issues.apache.org/jira/secure/CreateIssue!default.jspa");
+
+      return;
+    }
+
     writeLock.lock();
+
     try {
       if (add) { // added node
-        long nodeMemory = totalResource.getMemorySize();
-        if (nodeMemory > maxNodeMemory) {
-          maxNodeMemory = nodeMemory;
-        }
-        int nodeVCores = totalResource.getVirtualCores();
-        if (nodeVCores > maxNodeVCores) {
-          maxNodeVCores = nodeVCores;
+        // If we add a node, we must have a max allocation for all resource
+        // types
+        reportedMaxAllocation = true;
+
+        for (int i = 0; i < maxAllocation.length; i++) {
+          long value = totalResources[i].getValue();
+
+          if (value > maxAllocation[i]) {
+            maxAllocation[i] = value;
+          }
         }
       } else {  // removed node
-        if (maxNodeMemory == totalResource.getMemorySize()) {
-          maxNodeMemory = -1;
-        }
-        if (maxNodeVCores == totalResource.getVirtualCores()) {
-          maxNodeVCores = -1;
+        boolean recalculate = false;
+
+        for (int i = 0; i < maxAllocation.length; i++) {
+          if (totalResources[i].getValue() == maxAllocation[i]) {
+            // No need to set reportedMaxAllocation to false here because we
+            // will recalculate before we release the lock.
+            maxAllocation[i] = -1;
+            recalculate = true;
+          }
         }
+
         // We only have to iterate through the nodes if the current max memory
         // or vcores was equal to the removed node's
-        if (maxNodeMemory == -1 || maxNodeVCores == -1) {
+        if (recalculate) {
           // Treat it like an empty cluster and add nodes
+          reportedMaxAllocation = false;
           for (N n : nodes.values()) {
             updateMaxResources(n, true);
           }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 1309333..de30ade 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -49,6 +49,10 @@ public class MockNodes {
   private static int NODE_ID = 0;
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
+  public static void resetHostIds() {
+    NODE_ID = 0;
+  }
+
   public static List<RMNode> newNodes(int racks, int nodesPerRack,
                                         Resource perNode) {
     List<RMNode> list = Lists.newArrayList();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
index 7f527f1..c1703bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
@@ -17,16 +17,21 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.List;
-
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -34,11 +39,15 @@ import static org.junit.Assert.assertEquals;
  * loss of generality.
  */
 public class TestClusterNodeTracker {
-  private ClusterNodeTracker<FSSchedulerNode> nodeTracker =
-      new ClusterNodeTracker<>();
+  private ClusterNodeTracker<FSSchedulerNode> nodeTracker;
 
   @Before
   public void setup() {
+    nodeTracker = new ClusterNodeTracker<>();
+  }
+
+  private void addEight4x4Nodes() {
+    MockNodes.resetHostIds();
     List<RMNode> rmNodes =
         MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
     for (RMNode rmNode : rmNodes) {
@@ -48,6 +57,7 @@ public class TestClusterNodeTracker {
 
   @Test
   public void testGetNodeCount() {
+    addEight4x4Nodes();
     assertEquals("Incorrect number of nodes in the cluster",
         8, nodeTracker.nodeCount());
 
@@ -57,6 +67,7 @@ public class TestClusterNodeTracker {
 
   @Test
   public void testGetNodesForResourceName() throws Exception {
+    addEight4x4Nodes();
     assertEquals("Incorrect number of nodes matching ANY",
         8, nodeTracker.getNodesByResourceName(ResourceRequest.ANY).size());
 
@@ -66,4 +77,110 @@ public class TestClusterNodeTracker {
     assertEquals("Incorrect number of nodes matching node",
         1, nodeTracker.getNodesByResourceName("host0").size());
   }
+
+  @Test
+  public void testMaxAllowedAllocation() {
+    // Add a third resource
+    Configuration conf = new Configuration();
+
+    conf.set(YarnConfiguration.RESOURCE_TYPES, "test1");
+
+    ResourceUtils.resetResourceTypes(conf);
+    setup();
+
+    Resource maximum = Resource.newInstance(10240, 10,
+        Collections.singletonMap("test1", 10L));
+
+    nodeTracker.setConfiguredMaxAllocation(maximum);
+
+    Resource result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("With no nodes added, the ClusterNodeTracker did not return "
+        + "the configured max allocation", maximum, result);
+
+    List<RMNode> smallNodes =
+        MockNodes.newNodes(1, 1, Resource.newInstance(1024, 2,
+            Collections.singletonMap("test1", 4L)));
+    FSSchedulerNode smallNode = new FSSchedulerNode(smallNodes.get(0), false);
+    List<RMNode> mediumNodes =
+        MockNodes.newNodes(1, 1, Resource.newInstance(4096, 2,
+            Collections.singletonMap("test1", 2L)));
+    FSSchedulerNode mediumNode = new FSSchedulerNode(mediumNodes.get(0), false);
+    List<RMNode> largeNodes =
+        MockNodes.newNodes(1, 1, Resource.newInstance(16384, 4,
+            Collections.singletonMap("test1", 1L)));
+    FSSchedulerNode largeNode = new FSSchedulerNode(largeNodes.get(0), false);
+
+    nodeTracker.addNode(mediumNode);
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("With a single node added, the ClusterNodeTracker did not "
+        + "return that node's resources as the maximum allocation",
+        mediumNodes.get(0).getTotalCapability(), result);
+
+    nodeTracker.addNode(smallNode);
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("With two nodes added, the ClusterNodeTracker did not "
+        + "return a the maximum allocation that was the max of their aggregate "
+        + "resources",
+        Resource.newInstance(4096, 2, Collections.singletonMap("test1", 4L)),
+        result);
+
+    nodeTracker.removeNode(smallNode.getNodeID());
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("After removing a node, the ClusterNodeTracker did not "
+        + "recalculate the adjusted maximum allocation correctly",
+        mediumNodes.get(0).getTotalCapability(), result);
+
+    nodeTracker.addNode(largeNode);
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("With two nodes added, the ClusterNodeTracker did not "
+        + "return a the maximum allocation that was the max of their aggregate "
+        + "resources",
+        Resource.newInstance(10240, 4, Collections.singletonMap("test1", 2L)),
+        result);
+
+    nodeTracker.removeNode(largeNode.getNodeID());
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("After removing a node, the ClusterNodeTracker did not "
+        + "recalculate the adjusted maximum allocation correctly",
+        mediumNodes.get(0).getTotalCapability(), result);
+
+    nodeTracker.removeNode(mediumNode.getNodeID());
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("After removing all nodes, the ClusterNodeTracker did not "
+        + "return the configured maximum allocation", maximum, result);
+
+    nodeTracker.addNode(smallNode);
+    nodeTracker.addNode(mediumNode);
+    nodeTracker.addNode(largeNode);
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("With three nodes added, the ClusterNodeTracker did not "
+        + "return a the maximum allocation that was the max of their aggregate "
+        + "resources",
+        Resource.newInstance(10240, 4, Collections.singletonMap("test1", 4L)),
+        result);
+
+    nodeTracker.removeNode(smallNode.getNodeID());
+    nodeTracker.removeNode(mediumNode.getNodeID());
+    nodeTracker.removeNode(largeNode.getNodeID());
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("After removing all nodes, the ClusterNodeTracker did not "
+        + "return the configured maximum allocation", maximum, result);
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/03: YARN-8202. DefaultAMSProcessor should properly check units of requested custom resource types against minimum/maximum allocation (snemeth via rkanter)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 235d6c73e59a2e59aabae76c56ab5b7d55887516
Author: Robert Kanter <rk...@apache.org>
AuthorDate: Thu May 10 09:31:59 2018 -0700

    YARN-8202. DefaultAMSProcessor should properly check units of requested custom resource types against minimum/maximum allocation (snemeth via rkanter)
    
    (cherry picked from commit c8b53c43644b4ad22d5385c22cad8ed573c0b1ba)
    (cherry picked from commit 0506c762b2f96ccb2c12cce8fd4659536236354e)
---
 .../v2/app/rm/ContainerRequestCreator.java         |  57 +++
 .../v2/app/rm/TestRMContainerAllocator.java        | 533 +++++++++++----------
 .../hadoop/yarn/util/UnitsConversionUtil.java      |  44 +-
 .../resourcetypes/ResourceTypesTestHelper.java     |  93 ++++
 .../hadoop/yarn/server/utils/BuilderUtils.java     |   8 +-
 .../resourcemanager/scheduler/SchedulerUtils.java  |  95 +++-
 .../TestApplicationMasterService.java              | 176 +++++--
 .../scheduler/TestSchedulerUtils.java              | 279 ++++++++++-
 8 files changed, 957 insertions(+), 328 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java
new file mode 100644
index 0000000..39a9ddc
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+final class ContainerRequestCreator {
+
+  private ContainerRequestCreator() {}
+
+  static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId,
+          Resource resource, String[] hosts) {
+    return createRequest(jobId, taskAttemptId, resource, hosts,
+            false, false);
+  }
+
+  static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId,
+          Resource resource, String[] hosts, boolean earlierFailedAttempt,
+          boolean reduce) {
+    final TaskId taskId;
+    if (reduce) {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    } else {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    }
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+            taskAttemptId);
+
+    if (earlierFailedAttempt) {
+      return ContainerRequestEvent
+              .createContainerRequestEventForFailedContainer(attemptId,
+                      resource);
+    }
+    return new ContainerRequestEvent(attemptId, resource, hosts,
+            new String[]{NetworkTopology.DEFAULT_RACK});
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 4ee0a14..af66473 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import static org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestCreator.createRequest;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyFloat;
@@ -201,7 +202,7 @@ public class TestRMContainerAllocator {
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
-        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
             0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
@@ -213,13 +214,13 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
 
     // create the container request
-    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(jobId,
+        1, Resource.newInstance(1024, 1), new String[] {"h1"});
     allocator.sendRequest(event1);
 
     // send 1 more request with different resource req
-    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
-        new String[] { "h2" });
+    ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(jobId,
+        2, Resource.newInstance(1024, 1), new String[] {"h2"});
     allocator.sendRequest(event2);
 
     // this tells the scheduler about the requests
@@ -230,8 +231,8 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size());
 
     // send another request with different resource and priority
-    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
-        new String[] { "h3" });
+    ContainerRequestEvent event3 = ContainerRequestCreator.createRequest(jobId,
+        3, Resource.newInstance(1024, 1), new String[] {"h3"});
     allocator.sendRequest(event3);
 
     // this tells the scheduler about the requests
@@ -240,7 +241,7 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size());
-    
+
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
@@ -250,21 +251,21 @@ public class TestRMContainerAllocator {
     assigned = allocator.schedule();
     rm.drainEvents();
     Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size());
-    checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+    checkAssignments(new ContainerRequestEvent[] {event1, event2, event3},
         assigned, false);
-    
+
     // check that the assigned container requests are cancelled
     allocator.schedule();
     rm.drainEvents();
     Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());
   }
-  
-  @Test 
+
+  @Test
   public void testMapNodeLocality() throws Exception {
-    // test checks that ordering of allocated containers list from the RM does 
-    // not affect the map->container assignment done by the AM. If there is a 
-    // node local container available for a map then it should be assigned to 
-    // that container and not a rack-local container that happened to be seen 
+    // test checks that ordering of allocated containers list from the RM does
+    // not affect the map->container assignment done by the AM. If there is a
+    // node local container available for a map then it should be assigned to
+    // that container and not a rack-local container that happened to be seen
     // earlier in the allocated containers list from the RM.
     // Regression test for MAPREDUCE-4893
     LOG.info("Running testMapNodeLocality");
@@ -289,26 +290,29 @@ public class TestRMContainerAllocator {
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
-        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
             0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
     // add resources to scheduler
-    MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps 
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps
     rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node
     MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map
     rm.drainEvents();
 
     // create the container requests for maps
-    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(
+            jobId, 1, Resource.newInstance(1024, 1),
+            new String[]{"h1"});
     allocator.sendRequest(event1);
-    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(
+            jobId, 2, Resource.newInstance(1024, 1),
+            new String[]{"h1"});
     allocator.sendRequest(event2);
-    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
-        new String[] { "h2" });
+    ContainerRequestEvent event3 = ContainerRequestCreator.createRequest(
+            jobId, 3, Resource.newInstance(1024, 1),
+            new String[]{"h2"});
     allocator.sendRequest(event3);
 
     // this tells the scheduler about the requests
@@ -321,14 +325,14 @@ public class TestRMContainerAllocator {
     // Node heartbeat from rack-local first. This makes node h3 the first in the
     // list of allocated containers but it should not be assigned to task1.
     nodeManager3.nodeHeartbeat(true);
-    // Node heartbeat from node-local next. This allocates 2 node local 
+    // Node heartbeat from node-local next. This allocates 2 node local
     // containers for task1 and task2. These should be matched with those tasks.
     nodeManager1.nodeHeartbeat(true);
     rm.drainEvents();
 
     assigned = allocator.schedule();
     rm.drainEvents();
-    checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+    checkAssignments(new ContainerRequestEvent[] {event1, event2, event3},
         assigned, false);
     // remove the rack-local assignment that should have happened for task3
     for(TaskAttemptContainerAssignedEvent event : assigned) {
@@ -338,7 +342,7 @@ public class TestRMContainerAllocator {
         break;
       }
     }
-    checkAssignments(new ContainerRequestEvent[] { event1, event2},
+    checkAssignments(new ContainerRequestEvent[] {event1, event2},
         assigned, true);
   }
 
@@ -379,13 +383,15 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
 
     // create the container request
-    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(
+            jobId, 1, Resource.newInstance(1024, 1),
+        new String[] {"h1"});
     allocator.sendRequest(event1);
 
     // send 1 more request with different resource req
-    ContainerRequestEvent event2 = createReq(jobId, 2, 2048,
-        new String[] { "h2" });
+    ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(
+            jobId, 2, Resource.newInstance(1024, 1),
+        new String[] {"h2"});
     allocator.sendRequest(event2);
 
     // this tells the scheduler about the requests
@@ -402,7 +408,7 @@ public class TestRMContainerAllocator {
 
     assigned = allocator.schedule();
     rm.drainEvents();
-    checkAssignments(new ContainerRequestEvent[] { event1, event2 },
+    checkAssignments(new ContainerRequestEvent[] {event1, event2},
         assigned, false);
   }
 
@@ -437,15 +443,19 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
 
     // create the container request
-    final String[] locations = new String[] { host };
-    allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
+    final String[] locations = new String[] {host};
+    allocator.sendRequest(createRequest(jobId, 0,
+            Resource.newInstance(1024, 1),
+            locations, false, true));
     for (int i = 0; i < 1;) {
       rm.drainEvents();
       i += allocator.schedule().size();
       nm.nodeHeartbeat(true);
     }
 
-    allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false));
+    allocator.sendRequest(createRequest(jobId, 0,
+            Resource.newInstance(1024, 1),
+            locations, true, false));
     while (allocator.getTaskAttemptKillEvents().size() == 0) {
       rm.drainEvents();
       allocator.schedule().size();
@@ -492,9 +502,10 @@ public class TestRMContainerAllocator {
     RMContainerAllocator.ScheduledRequests scheduledRequests =
         allocator.getScheduledRequests();
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+        createRequest(jobId, 1, Resource.newInstance(2048, 1),
+            new String[] {"h1"}, false, false);
     scheduledRequests.maps.put(mock(TaskAttemptId.class),
-        new RMContainerRequestor.ContainerRequest(event1, null,null));
+        new RMContainerRequestor.ContainerRequest(event1, null, null));
     assignedRequests.reduces.put(mock(TaskAttemptId.class),
         mock(Container.class));
 
@@ -545,9 +556,12 @@ public class TestRMContainerAllocator {
     RMContainerAllocator.ScheduledRequests scheduledRequests =
         allocator.getScheduledRequests();
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+        createRequest(jobId, 1,
+                Resource.newInstance(2048, 1),
+                new String[] {"h1"}, false, false);
     scheduledRequests.maps.put(mock(TaskAttemptId.class),
-        new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
+        new RMContainerRequestor.ContainerRequest(event1, null,
+                clock.getTime()));
     assignedRequests.reduces.put(mock(TaskAttemptId.class),
         mock(Container.class));
 
@@ -559,7 +573,7 @@ public class TestRMContainerAllocator {
     clock.setTime(clock.getTime() + (preemptThreshold) * 1000);
     allocator.preemptReducesIfNeeded();
     Assert.assertEquals("The reducer is not preeempted", 1,
-        assignedRequests.preemptionWaitingReduces.size());
+            assignedRequests.preemptionWaitingReduces.size());
   }
 
   @Test(timeout = 30000)
@@ -606,9 +620,12 @@ public class TestRMContainerAllocator {
     RMContainerAllocator.ScheduledRequests scheduledRequests =
         allocator.getScheduledRequests();
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+        createRequest(jobId, 1,
+                Resource.newInstance(2048, 1),
+                new String[] {"h1"}, false, false);
     scheduledRequests.maps.put(mock(TaskAttemptId.class),
-        new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
+        new RMContainerRequestor.ContainerRequest(event1, null,
+                clock.getTime()));
     assignedRequests.reduces.put(mock(TaskAttemptId.class),
         mock(Container.class));
 
@@ -649,13 +666,17 @@ public class TestRMContainerAllocator {
         appAttemptId, mockJob, SystemClock.getInstance());
 
     // request to allocate two reduce priority containers
-    final String[] locations = new String[] { host };
-    allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
+    final String[] locations = new String[] {host};
+    allocator.sendRequest(createRequest(jobId, 0,
+            Resource.newInstance(1024, 1),
+            locations, false, true));
     allocator.scheduleAllReduces();
     allocator.makeRemoteRequest();
     nm.nodeHeartbeat(true);
     rm.drainEvents();
-    allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false));
+    allocator.sendRequest(createRequest(jobId, 1,
+            Resource.newInstance(1024, 1),
+            locations, false, false));
 
     int assignedContainer;
     for (assignedContainer = 0; assignedContainer < 1;) {
@@ -682,7 +703,7 @@ public class TestRMContainerAllocator {
     conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes");
     ApplicationId appId = ApplicationId.newInstance(1, 1);
     ApplicationAttemptId appAttemptId =
-        ApplicationAttemptId.newInstance(appId, 1);
+            ApplicationAttemptId.newInstance(appId, 1);
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
@@ -704,13 +725,16 @@ public class TestRMContainerAllocator {
 
     // create some map requests
     ContainerRequestEvent reqMapEvents;
-    reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" });
+    reqMapEvents = ContainerRequestCreator.createRequest(jobId, 0,
+            Resource.newInstance(1024, 1), new String[]{"map"});
     allocator.sendRequests(Arrays.asList(reqMapEvents));
 
     // create some reduce requests
     ContainerRequestEvent reqReduceEvents;
     reqReduceEvents =
-        createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true);
+        createRequest(jobId, 0,
+                Resource.newInstance(2048, 1),
+                new String[] {"reduce"}, false, true);
     allocator.sendRequests(Arrays.asList(reqReduceEvents));
     allocator.schedule();
     // verify all of the host-specific asks were sent plus one for the
@@ -881,18 +905,21 @@ public class TestRMContainerAllocator {
 
     // create the container request
     // send MAP request
-    ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] {
-        "h1", "h2" }, true, false);
+    ContainerRequestEvent event1 = createRequest(jobId, 1,
+            Resource.newInstance(2048, 1),
+            new String[] {"h1", "h2"}, true, false);
     allocator.sendRequest(event1);
 
     // send REDUCE request
-    ContainerRequestEvent event2 = createReq(jobId, 2, 3000,
-        new String[] { "h1" }, false, true);
+    ContainerRequestEvent event2 = createRequest(jobId, 2,
+            Resource.newInstance(3000, 1),
+            new String[] {"h1"}, false, true);
     allocator.sendRequest(event2);
 
     // send MAP request
-    ContainerRequestEvent event3 = createReq(jobId, 3, 2048,
-        new String[] { "h3" }, false, false);
+    ContainerRequestEvent event3 = createRequest(jobId, 3,
+            Resource.newInstance(2048, 1),
+            new String[] {"h3"}, false, false);
     allocator.sendRequest(event3);
 
     // this tells the scheduler about the requests
@@ -909,7 +936,7 @@ public class TestRMContainerAllocator {
 
     assigned = allocator.schedule();
     rm.drainEvents();
-    checkAssignments(new ContainerRequestEvent[] { event1, event3 },
+    checkAssignments(new ContainerRequestEvent[] {event1, event3},
         assigned, false);
 
     // validate that no container is assigned to h1 as it doesn't have 2048
@@ -919,10 +946,10 @@ public class TestRMContainerAllocator {
     }
   }
 
-  private static class MyResourceManager extends MockRM {
+  static class MyResourceManager extends MockRM {
 
     private static long fakeClusterTimeStamp = System.currentTimeMillis();
-    
+
     public MyResourceManager(Configuration conf) {
       super(conf);
     }
@@ -953,7 +980,7 @@ public class TestRMContainerAllocator {
     protected ResourceScheduler createScheduler() {
       return new MyFifoScheduler(this.getRMContext());
     }
-    
+
     MyFifoScheduler getMyFifoScheduler() {
       return (MyFifoScheduler) scheduler;
     }
@@ -1219,7 +1246,7 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
   }
-  
+
   @Test
   public void testUpdatedNodes() throws Exception {
     Configuration conf = new Configuration();
@@ -1249,11 +1276,13 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
 
     // create the map container request
-    ContainerRequestEvent event = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event =
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h1"});
     allocator.sendRequest(event);
     TaskAttemptId attemptId = event.getAttemptID();
-    
+
     TaskAttempt mockTaskAttempt = mock(TaskAttempt.class);
     when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId());
     Task mockTask = mock(Task.class);
@@ -1277,7 +1306,7 @@ public class TestRMContainerAllocator {
     // no updated nodes reported
     Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
     Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty());
-    
+
     // mark nodes bad
     nm1.nodeHeartbeat(false);
     nm2.nodeHeartbeat(false);
@@ -1290,11 +1319,13 @@ public class TestRMContainerAllocator {
     // updated nodes are reported
     Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
     Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size());
-    Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
-    Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
+    Assert.assertEquals(2,
+        allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
+    Assert.assertEquals(attemptId,
+        allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
     allocator.getJobUpdatedNodeEvents().clear();
     allocator.getTaskAttemptKillEvents().clear();
-    
+
     assigned = allocator.schedule();
     rm.drainEvents();
     Assert.assertEquals(0, assigned.size());
@@ -1305,7 +1336,7 @@ public class TestRMContainerAllocator {
 
   @Test
   public void testBlackListedNodes() throws Exception {
-    
+
     LOG.info("Running testBlackListedNodes");
 
     Configuration conf = new Configuration();
@@ -1313,7 +1344,7 @@ public class TestRMContainerAllocator {
     conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
     conf.setInt(
         MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
-    
+
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
 
@@ -1329,7 +1360,7 @@ public class TestRMContainerAllocator {
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
     rm.drainEvents();
-    
+
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
@@ -1345,18 +1376,24 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
 
     // create the container request
-    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event1 =
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h1"});
     allocator.sendRequest(event1);
 
     // send 1 more request with different resource req
-    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
-        new String[] { "h2" });
+    ContainerRequestEvent event2 =
+            ContainerRequestCreator.createRequest(jobId, 2,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h2"});
     allocator.sendRequest(event2);
 
     // send another request with different resource and priority
-    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
-        new String[] { "h3" });
+    ContainerRequestEvent event3 =
+            ContainerRequestCreator.createRequest(jobId, 3,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h3"});
     allocator.sendRequest(event3);
 
     // this tells the scheduler about the requests
@@ -1366,9 +1403,9 @@ public class TestRMContainerAllocator {
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Send events to blacklist nodes h1 and h2
-    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);            
+    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
     allocator.sendFailure(f1);
-    ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);            
+    ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);
     allocator.sendFailure(f2);
 
     // update resources in scheduler
@@ -1390,23 +1427,23 @@ public class TestRMContainerAllocator {
     assigned = allocator.schedule();
     rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
-    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
     rm.drainEvents();
     assigned = allocator.schedule();
     rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
-        
+
     Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
-    
+
     // validate that all containers are assigned to h3
     for (TaskAttemptContainerAssignedEvent assig : assigned) {
       Assert.assertTrue("Assigned container host not correct", "h3".equals(assig
           .getContainer().getNodeId().getHost()));
     }
   }
-  
+
   @Test
   public void testIgnoreBlacklisting() throws Exception {
     LOG.info("Running testIgnoreBlacklisting");
@@ -1446,7 +1483,7 @@ public class TestRMContainerAllocator {
 
     // Known=1, blacklisted=0, ignore should be false - assign first container
     assigned =
-        getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 1, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
@@ -1461,47 +1498,47 @@ public class TestRMContainerAllocator {
     // Because makeRemoteRequest will not be aware of it until next call
     // The current call will send blacklisted node "h1" to RM
     assigned =
-        getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 2, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 1, 0, 0, 1, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Known=1, blacklisted=1, ignore should be true - assign 1
     assigned =
-        getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 2, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
     assigned =
-        getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
+        getContainerOnHost(jobId, 3, 1024, new String[] {"h2"},
             nodeManagers[1], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
     assigned =
-        getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
+        getContainerOnHost(jobId, 4, 1024, new String[] {"h3"},
             nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Known=3, blacklisted=1, ignore should be true - assign 1
     assigned =
-        getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 5, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=4, blacklisted=1, ignore should be false - assign 1 anyway
     assigned =
-        getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
+        getContainerOnHost(jobId, 6, 1024, new String[] {"h4"},
             nodeManagers[3], allocator, 0, 0, 1, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Test blacklisting re-enabled.
     // Known=4, blacklisted=1, ignore should be false - no assignment on h1
     assigned =
-        getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 7, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     // RMContainerRequestor would have created a replacement request.
@@ -1514,20 +1551,20 @@ public class TestRMContainerAllocator {
     // Known=4, blacklisted=2, ignore should be true. Should assign 0
     // container for the same reason above.
     assigned =
-        getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 8, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 1, 0, 0, 2, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Known=4, blacklisted=2, ignore should be true. Should assign 2
     // containers.
     assigned =
-        getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 8, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
 
     // Known=4, blacklisted=2, ignore should be true.
     assigned =
-        getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
+        getContainerOnHost(jobId, 9, 1024, new String[] {"h2"},
             nodeManagers[1], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
@@ -1538,23 +1575,23 @@ public class TestRMContainerAllocator {
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=5, blacklisted=3, ignore should be true.
     assigned =
-        getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
+        getContainerOnHost(jobId, 10, 1024, new String[] {"h3"},
             nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
-    
+
     // Assign on 5 more nodes - to re-enable blacklisting
     for (int i = 0; i < 5; i++) {
       nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
       assigned =
           getContainerOnHost(jobId, 11 + i, 1024,
-              new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
+              new String[] {String.valueOf(5 + i)}, nodeManagers[4 + i],
               allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
       Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
     }
 
     // Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
     assigned =
-        getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
+        getContainerOnHost(jobId, 20, 1024, new String[] {"h3"},
             nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
   }
@@ -1574,7 +1611,8 @@ public class TestRMContainerAllocator {
           int expectedAdditions2, int expectedRemovals2, MyResourceManager rm)
           throws Exception {
     ContainerRequestEvent reqEvent =
-        createReq(jobId, taskAttemptId, memory, hosts);
+            ContainerRequestCreator.createRequest(jobId, taskAttemptId,
+                    Resource.newInstance(memory, 1), hosts);
     allocator.sendRequest(reqEvent);
 
     // Send the request to the RM
@@ -1594,7 +1632,7 @@ public class TestRMContainerAllocator {
         expectedAdditions2, expectedRemovals2, rm);
     return assigned;
   }
- 
+
   @Test
   public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
     LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
@@ -1604,7 +1642,7 @@ public class TestRMContainerAllocator {
     conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
     conf.setInt(
         MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
-    
+
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
 
@@ -1620,7 +1658,7 @@ public class TestRMContainerAllocator {
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
     rm.drainEvents();
-    
+
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
@@ -1636,8 +1674,10 @@ public class TestRMContainerAllocator {
 
     LOG.info("Requesting 1 Containers _1 on H1");
     // create the container request
-    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event1 =
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h1"});
     allocator.sendRequest(event1);
 
     LOG.info("RM Heartbeat (to send the container requests)");
@@ -1651,13 +1691,13 @@ public class TestRMContainerAllocator {
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     rm.drainEvents();
-    
+
     LOG.info("RM Heartbeat (To process the scheduled containers)");
     assigned = allocator.schedule();
     rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
-    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());    
-    
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
     LOG.info("Failing container _1 on H1 (should blacklist the node)");
     // Send events to blacklist nodes h1 and h2
     ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
@@ -1665,8 +1705,9 @@ public class TestRMContainerAllocator {
 
     //At this stage, a request should be created for a fast fail map
     //Create a FAST_FAIL request for a previously failed map.
-    ContainerRequestEvent event1f = createReq(jobId, 1, 1024,
-        new String[] { "h1" }, true, false);
+    ContainerRequestEvent event1f = createRequest(jobId, 1,
+            Resource.newInstance(1024, 1),
+            new String[] {"h1"}, true, false);
     allocator.sendRequest(event1f);
 
     //Update the Scheduler with the new requests.
@@ -1676,24 +1717,26 @@ public class TestRMContainerAllocator {
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // send another request with different resource and priority
-    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
-        new String[] { "h1", "h3" });
+    ContainerRequestEvent event3 =
+            ContainerRequestCreator.createRequest(jobId, 3,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h1", "h3"});
     allocator.sendRequest(event3);
-    
+
     //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container.
     //RM is only aware of the prio:5 container
-    
+
     LOG.info("h1 Heartbeat (To actually schedule the containers)");
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     rm.drainEvents();
-    
+
     LOG.info("RM Heartbeat (To process the scheduled containers)");
     assigned = allocator.schedule();
     rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
-    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
-    
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
     //RMContainerAllocator gets assigned a p:5 on a blacklisted node.
 
     //Send a release for the p:5 container + another request.
@@ -1702,26 +1745,26 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-    
+
     //Hearbeat from H3 to schedule on this host.
     LOG.info("h3 Heartbeat (To re-schedule the containers)");
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
     rm.drainEvents();
-    
+
     LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
     assigned = allocator.schedule();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     rm.drainEvents();
-     
+
     // For debugging
     for (TaskAttemptContainerAssignedEvent assig : assigned) {
       LOG.info(assig.getTaskAttemptID() +
           " assgined to " + assig.getContainer().getId() +
           " with priority " + assig.getContainer().getPriority());
     }
-    
+
     Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
-    
+
     // validate that all containers are assigned to h3
     for (TaskAttemptContainerAssignedEvent assig : assigned) {
       Assert.assertEquals("Assigned container " + assig.getContainer().getId()
@@ -1756,13 +1799,13 @@ public class TestRMContainerAllocator {
         assert (false);
       }
     }
-    
+
     List<ResourceRequest> lastAsk = null;
     List<ContainerId> lastRelease = null;
     List<String> lastBlacklistAdditions;
     List<String> lastBlacklistRemovals;
     Resource forceResourceLimit = null;
-    
+
     // override this to copy the objects otherwise FifoScheduler updates the
     // numContainers in same objects as kept by RMContainerAllocator
     @Override
@@ -1851,38 +1894,6 @@ public class TestRMContainerAllocator {
     }
   }
 
-  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
-      int memory, String[] hosts) {
-    return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false);
-  }
-
-  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
-      int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
-    return createReq(jobId, taskAttemptId, mem,
-        1, hosts, earlierFailedAttempt, reduce);
-  }
-
-  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
-      int memory, int vcore, String[] hosts, boolean earlierFailedAttempt,
-      boolean reduce) {
-    TaskId taskId;
-    if (reduce) {
-      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
-    } else {
-      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
-    }
-    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
-        taskAttemptId);
-    Resource containerNeed = Resource.newInstance(memory, vcore);
-    if (earlierFailedAttempt) {
-      return ContainerRequestEvent
-          .createContainerRequestEventForFailedContainer(attemptId,
-              containerNeed);
-    }
-    return new ContainerRequestEvent(attemptId, containerNeed, hosts,
-        new String[] { NetworkTopology.DEFAULT_RACK });
-  }
-
   private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId,
       String host, boolean reduce) {
     TaskId taskId;
@@ -1893,9 +1904,9 @@ public class TestRMContainerAllocator {
     }
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
         taskAttemptId);
-    return new ContainerFailedEvent(attemptId, host);    
+    return new ContainerFailedEvent(attemptId, host);
   }
-  
+
   private ContainerAllocatorEvent createDeallocateEvent(JobId jobId,
       int taskAttemptId, boolean reduce) {
     TaskId taskId;
@@ -1953,14 +1964,14 @@ public class TestRMContainerAllocator {
 
   // Mock RMContainerAllocator
   // Instead of talking to remote Scheduler,uses the local Scheduler
-  private static class MyContainerAllocator extends RMContainerAllocator {
-    static final List<TaskAttemptContainerAssignedEvent> events
-      = new ArrayList<TaskAttemptContainerAssignedEvent>();
-    static final List<TaskAttemptKillEvent> taskAttemptKillEvents 
-      = new ArrayList<TaskAttemptKillEvent>();
-    static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents 
-    = new ArrayList<JobUpdatedNodesEvent>();
-    static final List<JobEvent> jobEvents = new ArrayList<JobEvent>();
+  static class MyContainerAllocator extends RMContainerAllocator {
+    static final List<TaskAttemptContainerAssignedEvent> events =
+        new ArrayList<>();
+    static final List<TaskAttemptKillEvent> taskAttemptKillEvents =
+        new ArrayList<>();
+    static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents =
+        new ArrayList<>();
+    static final List<JobEvent> jobEvents = new ArrayList<>();
     private MyResourceManager rm;
     private boolean isUnregistered = false;
     private AllocateResponse allocateResponse;
@@ -2063,7 +2074,7 @@ public class TestRMContainerAllocator {
     }
 
     public void sendRequest(ContainerRequestEvent req) {
-      sendRequests(Arrays.asList(new ContainerRequestEvent[] { req }));
+      sendRequests(Arrays.asList(new ContainerRequestEvent[] {req}));
     }
 
     public void sendRequests(List<ContainerRequestEvent> reqs) {
@@ -2075,7 +2086,7 @@ public class TestRMContainerAllocator {
     public void sendFailure(ContainerFailedEvent f) {
       super.handleEvent(f);
     }
-    
+
     public void sendDeallocate(ContainerAllocatorEvent f) {
       super.handleEvent(f);
     }
@@ -2093,16 +2104,15 @@ public class TestRMContainerAllocator {
       // run the scheduler
       super.heartbeat();
 
-      List<TaskAttemptContainerAssignedEvent> result
-        = new ArrayList<TaskAttemptContainerAssignedEvent>(events);
+      List<TaskAttemptContainerAssignedEvent> result = new ArrayList<>(events);
       events.clear();
       return result;
     }
-    
+
     static List<TaskAttemptKillEvent> getTaskAttemptKillEvents() {
       return taskAttemptKillEvents;
     }
-    
+
     static List<JobUpdatedNodesEvent> getJobUpdatedNodeEvents() {
       return jobUpdatedNodeEvents;
     }
@@ -2111,12 +2121,12 @@ public class TestRMContainerAllocator {
     protected void startAllocatorThread() {
       // override to NOT start thread
     }
-    
+
     @Override
     protected boolean isApplicationMasterRegistered() {
       return super.isApplicationMasterRegistered();
     }
-    
+
     public boolean isUnregistered() {
       return isUnregistered;
     }
@@ -2158,7 +2168,7 @@ public class TestRMContainerAllocator {
     int numPendingReduces = 4;
     float maxReduceRampupLimit = 0.5f;
     float reduceSlowStart = 0.2f;
-    
+
     RMContainerAllocator allocator = mock(RMContainerAllocator.class);
     doCallRealMethod().when(allocator).scheduleReduces(anyInt(), anyInt(),
         anyInt(), anyInt(), anyInt(), anyInt(), any(Resource.class),
@@ -2168,14 +2178,14 @@ public class TestRMContainerAllocator {
 
     // Test slow-start
     allocator.scheduleReduces(
-        totalMaps, succeededMaps, 
-        scheduledMaps, scheduledReduces, 
-        assignedMaps, assignedReduces, 
-        mapResourceReqt, reduceResourceReqt, 
-        numPendingReduces, 
+        totalMaps, succeededMaps,
+        scheduledMaps, scheduledReduces,
+        assignedMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator, never()).setIsReduceStarted(true);
-    
+
     // verify slow-start still in effect when no more maps need to
     // be scheduled but some have yet to complete
     allocator.scheduleReduces(
@@ -2191,23 +2201,23 @@ public class TestRMContainerAllocator {
     succeededMaps = 3;
     doReturn(BuilderUtils.newResource(0, 0)).when(allocator).getResourceLimit();
     allocator.scheduleReduces(
-        totalMaps, succeededMaps, 
-        scheduledMaps, scheduledReduces, 
-        assignedMaps, assignedReduces, 
-        mapResourceReqt, reduceResourceReqt, 
-        numPendingReduces, 
+        totalMaps, succeededMaps,
+        scheduledMaps, scheduledReduces,
+        assignedMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator, times(1)).setIsReduceStarted(true);
-    
+
     // Test reduce ramp-up
     doReturn(BuilderUtils.newResource(100 * 1024, 100 * 1)).when(allocator)
       .getResourceLimit();
     allocator.scheduleReduces(
-        totalMaps, succeededMaps, 
-        scheduledMaps, scheduledReduces, 
-        assignedMaps, assignedReduces, 
-        mapResourceReqt, reduceResourceReqt, 
-        numPendingReduces, 
+        totalMaps, succeededMaps,
+        scheduledMaps, scheduledReduces,
+        assignedMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator).rampUpReduces(anyInt());
     verify(allocator, never()).rampDownReduces(anyInt());
@@ -2226,18 +2236,18 @@ public class TestRMContainerAllocator {
     verify(allocator).rampDownReduces(anyInt());
 
     // Test reduce ramp-down for when there are scheduled maps
-    // Since we have two scheduled Maps, rampDownReducers 
+    // Since we have two scheduled Maps, rampDownReducers
     // should be invoked twice.
     scheduledMaps = 2;
     assignedReduces = 2;
     doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator)
       .getResourceLimit();
     allocator.scheduleReduces(
-        totalMaps, succeededMaps, 
-        scheduledMaps, scheduledReduces, 
-        assignedMaps, assignedReduces, 
-        mapResourceReqt, reduceResourceReqt, 
-        numPendingReduces, 
+        totalMaps, succeededMaps,
+        scheduledMaps, scheduledReduces,
+        assignedMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator, times(2)).rampDownReduces(anyInt());
 
@@ -2282,7 +2292,7 @@ public class TestRMContainerAllocator {
       recalculatedReduceSchedule = true;
     }
   }
-  
+
   @Test
   public void testCompletedTasksRecalculateSchedule() throws Exception {
     LOG.info("Running testCompletedTasksRecalculateSchedule");
@@ -2392,31 +2402,33 @@ public class TestRMContainerAllocator {
   public void testCompletedContainerEvent() {
     RMContainerAllocator allocator = new RMContainerAllocator(
         mock(ClientService.class), mock(AppContext.class));
-    
+
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
         MRBuilderUtils.newTaskId(
             MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
     ApplicationId applicationId = ApplicationId.newInstance(1, 1);
-    ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(
-        applicationId, 1);
-    ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, 1);
+    ApplicationAttemptId applicationAttemptId =
+        ApplicationAttemptId.newInstance(applicationId, 1);
+    ContainerId containerId =
+        ContainerId.newContainerId(applicationAttemptId, 1);
     ContainerStatus status = ContainerStatus.newInstance(
         containerId, ContainerState.RUNNING, "", 0);
 
     ContainerStatus abortedStatus = ContainerStatus.newInstance(
         containerId, ContainerState.RUNNING, "",
         ContainerExitStatus.ABORTED);
-    
+
     TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
         attemptId);
     Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
         event.getType());
-    
+
     TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
         abortedStatus, attemptId);
     Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
-    
-    ContainerId containerId2 = ContainerId.newContainerId(applicationAttemptId, 2);
+
+    ContainerId containerId2 =
+        ContainerId.newContainerId(applicationAttemptId, 2);
     ContainerStatus status2 = ContainerStatus.newInstance(containerId2,
         ContainerState.RUNNING, "", 0);
 
@@ -2432,7 +2444,7 @@ public class TestRMContainerAllocator {
         preemptedStatus, attemptId);
     Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType());
   }
-  
+
   @Test
   public void testUnregistrationOnlyIfRegistered() throws Exception {
     Configuration conf = new Configuration();
@@ -2475,7 +2487,7 @@ public class TestRMContainerAllocator {
     mrApp.stop();
     Assert.assertTrue(allocator.isUnregistered());
   }
-  
+
   // Step-1 : AM send allocate request for 2 ContainerRequests and 1
   // blackListeNode
   // Step-2 : 2 containers are allocated by RM.
@@ -2534,11 +2546,15 @@ public class TestRMContainerAllocator {
     // create the container request
     // send MAP request
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 1024, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event1);
 
     ContainerRequestEvent event2 =
-        createReq(jobId, 2, 2048, new String[] { "h1", "h2" });
+        ContainerRequestCreator.createRequest(jobId, 2,
+                Resource.newInstance(2048, 1),
+                new String[] {"h1", "h2"});
     allocator.sendRequest(event2);
 
     // Send events to blacklist h2
@@ -2576,7 +2592,9 @@ public class TestRMContainerAllocator {
     // RM
     // send container request
     ContainerRequestEvent event3 =
-        createReq(jobId, 3, 1000, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 3,
+                    Resource.newInstance(1000, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event3);
 
     // send deallocate request
@@ -2620,7 +2638,9 @@ public class TestRMContainerAllocator {
     allocator.sendFailure(f2);
 
     ContainerRequestEvent event4 =
-        createReq(jobId, 4, 2000, new String[] { "h1", "h2" });
+            ContainerRequestCreator.createRequest(jobId, 4,
+                    Resource.newInstance(2000, 1),
+                    new String[]{"h1", "h2"});
     allocator.sendRequest(event4);
 
     // send allocate request to 2nd RM and get resync command
@@ -2631,7 +2651,9 @@ public class TestRMContainerAllocator {
     // asks,release,blacklistAaddition
     // and another containerRequest(event5)
     ContainerRequestEvent event5 =
-        createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" });
+            ContainerRequestCreator.createRequest(jobId, 5,
+                    Resource.newInstance(3000, 1),
+                    new String[]{"h1", "h2", "h3"});
     allocator.sendRequest(event5);
 
     // send all outstanding request again.
@@ -2688,9 +2710,10 @@ public class TestRMContainerAllocator {
       }
     };
 
-    ContainerRequestEvent mapRequestEvt = createReq(jobId, 0,
-        (int) (maxContainerSupported.getMemorySize() + 10),
-        maxContainerSupported.getVirtualCores(),
+    final int memory = (int) (maxContainerSupported.getMemorySize() + 10);
+    ContainerRequestEvent mapRequestEvt = createRequest(jobId, 0,
+            Resource.newInstance(memory,
+            maxContainerSupported.getVirtualCores()),
         new String[0], false, false);
     allocator.sendRequests(Arrays.asList(mapRequestEvt));
     allocator.schedule();
@@ -2726,10 +2749,11 @@ public class TestRMContainerAllocator {
       }
     };
 
-    ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0,
-        (int) (maxContainerSupported.getMemorySize() + 10),
-        maxContainerSupported.getVirtualCores(),
-        new String[0], false, true);
+    final int memory = (int) (maxContainerSupported.getMemorySize() + 10);
+    ContainerRequestEvent reduceRequestEvt = createRequest(jobId, 0,
+            Resource.newInstance(memory,
+            maxContainerSupported.getVirtualCores()),
+            new String[0], false, true);
     allocator.sendRequests(Arrays.asList(reduceRequestEvt));
     // Reducer container requests are added to the pending queue upon request,
     // schedule all reducers here so that we can observe if reducer requests
@@ -2779,8 +2803,9 @@ public class TestRMContainerAllocator {
     rm1.drainEvents();
     Assert.assertEquals("Should Have 1 Job Event", 1,
         allocator.jobEvents.size());
-    JobEvent event = allocator.jobEvents.get(0); 
-    Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT));
+    JobEvent event = allocator.jobEvents.get(0);
+    Assert.assertTrue("Should Reboot",
+        event.getType().equals(JobEventType.JOB_AM_REBOOT));
   }
 
   @Test(timeout=60000)
@@ -2912,7 +2937,9 @@ public class TestRMContainerAllocator {
     // create some map requests
     ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
     for (int i = 0; i < reqMapEvents.length; ++i) {
-      reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
+      reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i,
+              Resource.newInstance(1024, 1),
+              new String[] {"h" + i});
     }
     allocator.sendRequests(Arrays.asList(reqMapEvents));
     // create some reduce requests
@@ -2920,7 +2947,8 @@ public class TestRMContainerAllocator {
         new ContainerRequestEvent[REDUCE_COUNT];
     for (int i = 0; i < reqReduceEvents.length; ++i) {
       reqReduceEvents[i] =
-          createReq(jobId, i, 1024, new String[] {}, false, true);
+          createRequest(jobId, i, Resource.newInstance(1024, 1),
+                  new String[] {}, false, true);
     }
     allocator.sendRequests(Arrays.asList(reqReduceEvents));
     allocator.schedule();
@@ -2967,14 +2995,17 @@ public class TestRMContainerAllocator {
     // create some map requests
     ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
     for (int i = 0; i < reqMapEvents.length; ++i) {
-      reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
+      reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i,
+          Resource.newInstance(1024, 1), new String[] {"h" + i});
     }
     allocator.sendRequests(Arrays.asList(reqMapEvents));
     // create some reduce requests
-    ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT];
+    ContainerRequestEvent[] reqReduceEvents =
+        new ContainerRequestEvent[REDUCE_COUNT];
     for (int i = 0; i < reqReduceEvents.length; ++i) {
-      reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
-          false, true);
+      reqReduceEvents[i] =
+          createRequest(jobId, i, Resource.newInstance(1024, 1),
+              new String[] {}, false, true);
     }
     allocator.sendRequests(Arrays.asList(reqReduceEvents));
     allocator.schedule();
@@ -3129,13 +3160,19 @@ public class TestRMContainerAllocator {
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 1024, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event1);
     ContainerRequestEvent event2 =
-        createReq(jobId, 2, 1024, new String[] { "h2" });
+            ContainerRequestCreator.createRequest(jobId, 2,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h2"});
     allocator.sendRequest(event2);
     ContainerRequestEvent event3 =
-        createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
+            createRequest(jobId, 3,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h2"}, false, true);
     allocator.sendRequest(event3);
 
     // This will tell the scheduler about the requests but there will be no
@@ -3148,7 +3185,8 @@ public class TestRMContainerAllocator {
 
     // Request for another reducer on h3 which has not registered.
     ContainerRequestEvent event4 =
-        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+        createRequest(jobId, 4, Resource.newInstance(1024, 1),
+                new String[] {"h3"}, false, true);
     allocator.sendRequest(event4);
 
     allocator.schedule();
@@ -3292,13 +3330,18 @@ public class TestRMContainerAllocator {
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 1024, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event1);
     ContainerRequestEvent event2 =
-        createReq(jobId, 2, 1024, new String[] { "h2" });
+            ContainerRequestCreator.createRequest(jobId, 2,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h2"});
     allocator.sendRequest(event2);
     ContainerRequestEvent event3 =
-        createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
+            createRequest(jobId, 3, Resource.newInstance(1024, 1),
+                    new String[]{"h2"}, false, true);
     allocator.sendRequest(event3);
 
     // This will tell the scheduler about the requests but there will be no
@@ -3311,7 +3354,8 @@ public class TestRMContainerAllocator {
 
     // Request for another reducer on h3 which has not registered.
     ContainerRequestEvent event4 =
-        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+            createRequest(jobId, 4, Resource.newInstance(1024, 1),
+                    new String[]{"h3"}, false, true);
     allocator.sendRequest(event4);
 
     allocator.schedule();
@@ -3424,13 +3468,19 @@ public class TestRMContainerAllocator {
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 1024, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event1);
     ContainerRequestEvent event2 =
-        createReq(jobId, 2, 1024, new String[] { "h2" });
+            ContainerRequestCreator.createRequest(jobId, 2,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h2"});
     allocator.sendRequest(event2);
     ContainerRequestEvent event3 =
-         createReq(jobId, 3, 1024, new String[] { "h1" }, false, true);
+            createRequest(jobId, 3,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"}, false, true);
     allocator.sendRequest(event3);
 
     // This will tell the scheduler about the requests but there will be no
@@ -3440,7 +3490,8 @@ public class TestRMContainerAllocator {
 
     // Request for another reducer on h3 which has not registered.
     ContainerRequestEvent event4 =
-        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+        createRequest(jobId, 4, Resource.newInstance(1024, 1),
+                new String[] {"h3"}, false, true);
     allocator.sendRequest(event4);
 
     allocator.schedule();
@@ -3477,7 +3528,9 @@ public class TestRMContainerAllocator {
 
     // Send request for one more mapper.
     ContainerRequestEvent event5 =
-        createReq(jobId, 5, 1024, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 5,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event5);
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2));
@@ -3519,7 +3572,7 @@ public class TestRMContainerAllocator {
       return RegisterApplicationMasterResponse.newInstance(
           Resource.newInstance(512, 1),
           Resource.newInstance(512000, 1024),
-          Collections.<ApplicationAccessType,String>emptyMap(),
+          Collections.<ApplicationAccessType, String>emptyMap(),
           ByteBuffer.wrap("fake_key".getBytes()),
           Collections.<Container>emptyList(),
           "default",
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
index 7a212e1..1da2fed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
@@ -175,16 +175,8 @@ public class UnitsConversionUtil {
    */
   public static int compare(String unitA, long valueA, String unitB,
       long valueB) {
-    if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA)
-        || !KNOWN_UNITS.contains(unitB)) {
-      throw new IllegalArgumentException("Units cannot be null");
-    }
-    if (!KNOWN_UNITS.contains(unitA)) {
-      throw new IllegalArgumentException("Unknown unit '" + unitA + "'");
-    }
-    if (!KNOWN_UNITS.contains(unitB)) {
-      throw new IllegalArgumentException("Unknown unit '" + unitB + "'");
-    }
+    checkUnitArgument(unitA);
+    checkUnitArgument(unitB);
     if (unitA.equals(unitB)) {
       return Long.compare(valueA, valueB);
     }
@@ -218,4 +210,36 @@ public class UnitsConversionUtil {
       return tmpA.compareTo(tmpB);
     }
   }
+
+  private static void checkUnitArgument(String unit) {
+    if (unit == null) {
+      throw new IllegalArgumentException("Unit cannot be null");
+    } else if (!KNOWN_UNITS.contains(unit)) {
+      throw new IllegalArgumentException("Unknown unit '" + unit + "'");
+    }
+  }
+
+  /**
+   * Compare a unit to another unit.
+   * <br>
+   * Examples:<br>
+   * 1. 'm' (milli) is smaller than 'k' (kilo), so compareUnits("m", "k")
+   * will return -1.<br>
+   * 2. 'M' (MEGA) is greater than 'k' (kilo), so compareUnits("M", "k") will
+   * return 1.
+   *
+   * @param unitA first unit
+   * @param unitB second unit
+   * @return +1, 0 or -1 depending on whether the relationship between units
+   * is smaller than,
+   * equal to or lesser than.
+   */
+  public static int compareUnits(String unitA, String unitB) {
+    checkUnitArgument(unitA);
+    checkUnitArgument(unitB);
+    int unitAPos = SORTED_UNITS.indexOf(unitA);
+    int unitBPos = SORTED_UNITS.indexOf(unitB);
+
+    return Integer.compare(unitAPos, unitBPos);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
new file mode 100644
index 0000000..98a8a00
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.resourcetypes;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Contains helper methods to create Resource and ResourceInformation objects.
+ * ResourceInformation can be created from a resource name
+ * and a resource descriptor as well that comprises amount and unit.
+ */
+public final class ResourceTypesTestHelper {
+
+  private static final Pattern RESOURCE_VALUE_AND_UNIT_PATTERN =
+      Pattern.compile("(\\d+)([A-za-z]*)");
+
+  private ResourceTypesTestHelper() {}
+
+  private static final RecordFactory RECORD_FACTORY = RecordFactoryProvider
+          .getRecordFactory(null);
+
+  private static final class ResourceValueAndUnit {
+    private final Long value;
+    private final String unit;
+
+    private ResourceValueAndUnit(Long value, String unit) {
+      this.value = value;
+      this.unit = unit;
+    }
+  }
+
+  public static Resource newResource(long memory, int vCores, Map<String,
+          String> customResources) {
+    Resource resource = RECORD_FACTORY.newRecordInstance(Resource.class);
+    resource.setMemorySize(memory);
+    resource.setVirtualCores(vCores);
+
+    for (Map.Entry<String, String> customResource :
+            customResources.entrySet()) {
+      String resourceName = customResource.getKey();
+      ResourceInformation resourceInformation =
+              createResourceInformation(resourceName,
+                      customResource.getValue());
+      resource.setResourceInformation(resourceName, resourceInformation);
+    }
+    return resource;
+  }
+
+  public static ResourceInformation createResourceInformation(String
+          resourceName, String descriptor) {
+    ResourceValueAndUnit resourceValueAndUnit =
+            getResourceValueAndUnit(descriptor);
+    return ResourceInformation
+            .newInstance(resourceName, resourceValueAndUnit.unit,
+                    resourceValueAndUnit.value);
+  }
+
+  private static ResourceValueAndUnit getResourceValueAndUnit(String val) {
+    Matcher matcher = RESOURCE_VALUE_AND_UNIT_PATTERN.matcher(val);
+    if (!matcher.find()) {
+      throw new RuntimeException("Invalid pattern of resource descriptor: " +
+              val);
+    } else if (matcher.groupCount() != 2) {
+      throw new RuntimeException("Capturing group count in string " +
+              val + " is not 2!");
+    }
+    long value = Long.parseLong(matcher.group(1));
+
+    return new ResourceValueAndUnit(value, matcher.group(2));
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 0f0d8b1..cc3bf38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -182,7 +182,7 @@ public class BuilderUtils {
   public static NodeId newNodeId(String host, int port) {
     return NodeId.newInstance(host, port);
   }
-  
+
   public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime) {
@@ -404,7 +404,7 @@ public class BuilderUtils {
     report.setPriority(priority);
     return report;
   }
-  
+
   public static ApplicationSubmissionContext newApplicationSubmissionContext(
       ApplicationId applicationId, String applicationName, String queue,
       Priority priority, ContainerLaunchContext amContainer,
@@ -459,6 +459,10 @@ public class BuilderUtils {
     return resource;
   }
 
+  public static Resource newEmptyResource() {
+    return recordFactory.newRecordInstance(Resource.class);
+  }
+
   public static URL newURL(String scheme, String host, int port, String file) {
     URL url = recordFactory.newRecordInstance(URL.class);
     url.setScheme(scheme);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 4d6726d..4bf3e13 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -291,24 +292,10 @@ public class SchedulerUtils {
   private static void validateResourceRequest(ResourceRequest resReq,
       Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
       throws InvalidResourceRequestException {
-    Resource requestedResource = resReq.getCapability();
-    for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
-      ResourceInformation reqRI = requestedResource.getResourceInformation(i);
-      ResourceInformation maxRI = maximumResource.getResourceInformation(i);
-      if (reqRI.getValue() < 0 || reqRI.getValue() > maxRI.getValue()) {
-        throw new InvalidResourceRequestException(
-            "Invalid resource request, requested resource type=[" + reqRI
-                .getName()
-                + "] < 0 or greater than maximum allowed allocation. Requested "
-                + "resource=" + requestedResource
-                + ", maximum allowed allocation=" + maximumResource
-                + ", please note that maximum allowed allocation is calculated "
-                + "by scheduler based on maximum resource of registered "
-                + "NodeManagers, which might be less than configured "
-                + "maximum allocation=" + ResourceUtils
-                .getResourceTypesMaximumAllocation());
-      }
-    }
+    final Resource requestedResource = resReq.getCapability();
+    checkResourceRequestAgainstAvailableResource(requestedResource,
+        maximumResource);
+
     String labelExp = resReq.getNodeLabelExpression();
     // we don't allow specify label expression other than resourceName=ANY now
     if (!ResourceRequest.ANY.equals(resReq.getResourceName())
@@ -346,6 +333,78 @@ public class SchedulerUtils {
     }
   }
 
+  @Private
+  @VisibleForTesting
+  static void checkResourceRequestAgainstAvailableResource(Resource reqResource,
+      Resource availableResource) throws InvalidResourceRequestException {
+    for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
+      final ResourceInformation requestedRI =
+          reqResource.getResourceInformation(i);
+      final String reqResourceName = requestedRI.getName();
+
+      if (requestedRI.getValue() < 0) {
+        throwInvalidResourceException(reqResource, availableResource,
+            reqResourceName);
+      }
+
+      final ResourceInformation availableRI =
+          availableResource.getResourceInformation(reqResourceName);
+
+      long requestedResourceValue = requestedRI.getValue();
+      long availableResourceValue = availableRI.getValue();
+      int unitsRelation = UnitsConversionUtil
+          .compareUnits(requestedRI.getUnits(), availableRI.getUnits());
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Requested resource information: " + requestedRI);
+        LOG.debug("Available resource information: " + availableRI);
+        LOG.debug("Relation of units: " + unitsRelation);
+      }
+
+      // requested resource unit is less than available resource unit
+      // e.g. requestedUnit: "m", availableUnit: "K")
+      if (unitsRelation < 0) {
+        availableResourceValue =
+            UnitsConversionUtil.convert(availableRI.getUnits(),
+                requestedRI.getUnits(), availableRI.getValue());
+
+        // requested resource unit is greater than available resource unit
+        // e.g. requestedUnit: "G", availableUnit: "M")
+      } else if (unitsRelation > 0) {
+        requestedResourceValue =
+            UnitsConversionUtil.convert(requestedRI.getUnits(),
+                availableRI.getUnits(), requestedRI.getValue());
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Requested resource value after conversion: " +
+                requestedResourceValue);
+        LOG.info("Available resource value after conversion: " +
+                availableResourceValue);
+      }
+
+      if (requestedResourceValue > availableResourceValue) {
+        throwInvalidResourceException(reqResource, availableResource,
+            reqResourceName);
+      }
+    }
+  }
+
+  private static void throwInvalidResourceException(Resource reqResource,
+      Resource availableResource, String reqResourceName)
+      throws InvalidResourceRequestException {
+    throw new InvalidResourceRequestException(
+        "Invalid resource request, requested resource type=[" + reqResourceName
+            + "] < 0 or greater than maximum allowed allocation. Requested "
+            + "resource=" + reqResource + ", maximum allowed allocation="
+            + availableResource
+            + ", please note that maximum allowed allocation is calculated "
+            + "by scheduler based on maximum resource of registered "
+            + "NodeManagers, which might be less than configured "
+            + "maximum allocation="
+            + ResourceUtils.getResourceTypesMaximumAllocation());
+  }
+
   private static void checkQueueLabelInLabelManager(String labelExpression,
       RMContext rmContext) throws InvalidLabelResourceRequestException {
     // check node label manager contains this label
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index ff976d4..729166b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -22,9 +22,13 @@ import static java.lang.Thread.sleep;
 import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
 import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
 
+
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -62,6 +66,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -77,11 +82,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
+        .FairSchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -367,7 +376,7 @@ public class TestApplicationMasterService {
       am2.addContainerToBeReleased(cId);
       try {
         am2.schedule();
-        Assert.fail("Exception was expected!!");
+        fail("Exception was expected!!");
       } catch (InvalidContainerReleaseException e) {
         StringBuilder sb = new StringBuilder("Cannot release container : ");
         sb.append(cId.toString());
@@ -462,7 +471,7 @@ public class TestApplicationMasterService {
               FinalApplicationStatus.FAILED, "", "");
       try {
         am1.unregisterAppAttempt(req, false);
-        Assert.fail("ApplicationMasterNotRegisteredException should be thrown");
+        fail("ApplicationMasterNotRegisteredException should be thrown");
       } catch (ApplicationMasterNotRegisteredException e) {
         Assert.assertNotNull(e);
         Assert.assertNotNull(e.getMessage());
@@ -470,7 +479,7 @@ public class TestApplicationMasterService {
             "Application Master is trying to unregister before registering for:"
         ));
       } catch (Exception e) {
-        Assert.fail("ApplicationMasterNotRegisteredException should be thrown");
+        fail("ApplicationMasterNotRegisteredException should be thrown");
       }
 
       am1.registerAppAttempt();
@@ -629,9 +638,7 @@ public class TestApplicationMasterService {
       Assert.assertEquals("UPDATE_OUTSTANDING_ERROR",
           response.getUpdateErrors().get(0).getReason());
     } finally {
-      if (rm != null) {
-        rm.close();
-      }
+      rm.close();
     }
   }
 
@@ -711,32 +718,46 @@ public class TestApplicationMasterService {
 
     ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
 
-    CapacitySchedulerConfiguration csconf =
-        new CapacitySchedulerConfiguration();
-    csconf.setResourceComparator(DominantResourceCalculator.class);
+    final YarnConfiguration yarnConf;
+    if (schedulerCls.getCanonicalName()
+        .equals(CapacityScheduler.class.getCanonicalName())) {
+      CapacitySchedulerConfiguration csConf =
+          new CapacitySchedulerConfiguration();
+      csConf.setResourceComparator(DominantResourceCalculator.class);
+      yarnConf = new YarnConfiguration(csConf);
+    } else if (schedulerCls.getCanonicalName()
+        .equals(FairScheduler.class.getCanonicalName())) {
+      FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
+      yarnConf = new YarnConfiguration(fsConf);
+    } else {
+      throw new IllegalStateException(
+          "Scheduler class is of wrong type: " + schedulerCls);
+    }
 
-    YarnConfiguration conf = new YarnConfiguration(csconf);
     // Don't reset resource types since we have already configured resource
     // types
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerCls,
+    yarnConf.setBoolean(TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES, false);
+    yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerCls,
         ResourceScheduler.class);
 
-    MockRM rm = new MockRM(conf);
+    MockRM rm = new MockRM(yarnConf);
     rm.start();
 
     MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils
         .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
             DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null));
 
-    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    RMApp app1 = rm.submitApp(GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
 
     // Now request resource, memory > allowed
     boolean exception = false;
     try {
-      am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
-          Resource.newInstance(9 * GB, 1)).numContainers(1).resourceName("*")
-          .build()), null);
+      am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
+              .capability(Resource.newInstance(9 * GB, 1))
+              .numContainers(1)
+              .resourceName("*")
+              .build()), null);
     } catch (InvalidResourceRequestException e) {
       exception = true;
     }
@@ -744,10 +765,12 @@ public class TestApplicationMasterService {
 
     exception = false;
     try {
-      // Now request resource, vcore > allowed
-      am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
-          Resource.newInstance(8 * GB, 18)).numContainers(1).resourceName("*")
-          .build()), null);
+      // Now request resource, vcores > allowed
+      am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
+              .capability(Resource.newInstance(8 * GB, 18))
+              .numContainers(1)
+              .resourceName("*")
+              .build()), null);
     } catch (InvalidResourceRequestException e) {
       exception = true;
     }
@@ -756,6 +779,71 @@ public class TestApplicationMasterService {
     rm.close();
   }
 
+  @Test
+  public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits()
+      throws Exception {
+
+    // Initialize resource map for 2 types.
+    Map<String, ResourceInformation> riMap = new HashMap<>();
+
+    // Initialize mandatory resources
+    ResourceInformation memory =
+        ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
+            ResourceInformation.MEMORY_MB.getUnits(),
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    ResourceInformation vcores =
+        ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
+            ResourceInformation.VCORES.getUnits(),
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+            DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    ResourceInformation res1 =
+        ResourceInformation.newInstance("res_1", "G", 0, 4);
+    riMap.put(ResourceInformation.MEMORY_URI, memory);
+    riMap.put(ResourceInformation.VCORES_URI, vcores);
+    riMap.put("res_1", res1);
+
+    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+    FairSchedulerConfiguration fsConf =
+            new FairSchedulerConfiguration();
+
+    YarnConfiguration yarnConf = new YarnConfiguration(fsConf);
+    // Don't reset resource types since we have already configured resource
+    // types
+    yarnConf.setBoolean(TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES, false);
+    yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+        ResourceScheduler.class);
+
+    MockRM rm = new MockRM(yarnConf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("199.99.99.1:1234",
+        ResourceTypesTestHelper.newResource(
+            DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+            DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+            ImmutableMap.<String, String> builder()
+                .put("res_1", "5G").build()));
+
+    RMApp app1 = rm.submitApp(GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+    // Now request res_1, 500M < 5G so it should be allowed
+    try {
+      am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
+          .capability(ResourceTypesTestHelper.newResource(4 * GB, 1,
+              ImmutableMap.<String, String> builder()
+                  .put("res_1", "500M")
+                      .build()))
+          .numContainers(1).resourceName("*").build()), null);
+    } catch (InvalidResourceRequestException e) {
+      fail(
+          "Allocate request should be accepted but exception was thrown: " + e);
+    }
+
+    rm.close();
+  }
+
   @Test(timeout = 300000)
   public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypes()
       throws Exception {
@@ -774,11 +862,11 @@ public class TestApplicationMasterService {
         ResourceInformation.VCORES.getUnits(),
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
         DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
-    ResourceInformation res_1 = ResourceInformation.newInstance("res_1",
+    ResourceInformation res1 = ResourceInformation.newInstance("res_1",
         ResourceInformation.VCORES.getUnits(), 0, 4);
     riMap.put(ResourceInformation.MEMORY_URI, memory);
     riMap.put(ResourceInformation.VCORES_URI, vcores);
-    riMap.put("res_1", res_1);
+    riMap.put("res_1", res1);
 
     ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
 
@@ -786,13 +874,14 @@ public class TestApplicationMasterService {
         new CapacitySchedulerConfiguration();
     csconf.setResourceComparator(DominantResourceCalculator.class);
 
-    YarnConfiguration conf = new YarnConfiguration(csconf);
+    YarnConfiguration yarnConf = new YarnConfiguration(csconf);
     // Don't reset resource types since we have already configured resource
     // types
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+    yarnConf.setBoolean(TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES, false);
+    yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
         ResourceScheduler.class);
 
-    MockRM rm = new MockRM(conf);
+    MockRM rm = new MockRM(yarnConf);
     rm.start();
 
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
@@ -803,18 +892,21 @@ public class TestApplicationMasterService {
             DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
             ImmutableMap.of("res_1", 4)));
 
-    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    RMApp app1 = rm.submitApp(GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
 
-    Assert.assertEquals(Resource.newInstance(1 * GB, 1),
+    Assert.assertEquals(Resource.newInstance(GB, 1),
         leafQueue.getUsedResources());
 
     // Now request resource, memory > allowed
     boolean exception = false;
     try {
-      am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
-          TestUtils.createResource(9 * GB, 1, ImmutableMap.of("res_1", 1)))
-          .numContainers(1).resourceName("*").build()), null);
+      am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
+              .capability(TestUtils.createResource(9 * GB, 1,
+                      ImmutableMap.of("res_1", 1)))
+              .numContainers(1)
+              .resourceName("*")
+              .build()), null);
     } catch (InvalidResourceRequestException e) {
       exception = true;
     }
@@ -822,11 +914,13 @@ public class TestApplicationMasterService {
 
     exception = false;
     try {
-      // Now request resource, vcore > allowed
-      am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
-          TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1)))
-          .numContainers(1).resourceName("*")
-          .build()), null);
+      // Now request resource, vcores > allowed
+      am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
+          .capability(
+              TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1)))
+              .numContainers(1)
+              .resourceName("*")
+              .build()), null);
     } catch (InvalidResourceRequestException e) {
       exception = true;
     }
@@ -835,10 +929,12 @@ public class TestApplicationMasterService {
     exception = false;
     try {
       // Now request resource, res_1 > allowed
-      am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
-          TestUtils.createResource(8 * GB, 1, ImmutableMap.of("res_1", 100)))
-          .numContainers(1).resourceName("*")
-          .build()), null);
+      am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
+              .capability(TestUtils.createResource(8 * GB, 1,
+                      ImmutableMap.of("res_1", 100)))
+              .numContainers(1)
+              .resourceName("*")
+              .build()), null);
     } catch (InvalidResourceRequestException e) {
       exception = true;
     }
@@ -854,7 +950,7 @@ public class TestApplicationMasterService {
       rmContainer.handle(
           new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
     } else {
-      Assert.fail("Cannot find RMContainer");
+      fail("Cannot find RMContainer");
     }
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
index 57b195f..9bb3f97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
@@ -26,7 +26,9 @@ import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.Arrays;
@@ -36,6 +38,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +46,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -64,8 +68,10 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
@@ -84,20 +90,79 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import org.junit.rules.ExpectedException;
 
 public class TestSchedulerUtils {
 
   private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class);
-  
+  private static Resource configuredMaxAllocation;
+
+  private static class CustomResourceTypesConfigurationProvider
+          extends LocalConfigurationProvider {
+
+    @Override
+    public InputStream getConfigurationInputStream(Configuration bootstrapConf,
+            String name) throws YarnException, IOException {
+      if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
+        return new ByteArrayInputStream(
+                ("<configuration>\n" +
+                        " <property>\n" +
+                        "   <name>yarn.resource-types</name>\n" +
+                        "   <value>custom-resource-1," +
+                        "custom-resource-2,custom-resource-3</value>\n" +
+                        " </property>\n" +
+                        " <property>\n" +
+                        "   <name>yarn.resource-types" +
+                        ".custom-resource-1.units</name>\n" +
+                        "   <value>G</value>\n" +
+                        " </property>\n" +
+                        " <property>\n" +
+                        "   <name>yarn.resource-types" +
+                        ".custom-resource-2.units</name>\n" +
+                        "   <value>G</value>\n" +
+                        " </property>\n" +
+                        "</configuration>\n").getBytes());
+      } else {
+        return super.getConfigurationInputStream(bootstrapConf, name);
+      }
+    }
+  }
   private RMContext rmContext = getMockRMContext();
+
   private static YarnConfiguration conf = new YarnConfiguration();
 
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  private void initResourceTypes() {
+    Configuration yarnConf = new Configuration();
+    yarnConf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+        CustomResourceTypesConfigurationProvider.class.getName());
+    ResourceUtils.resetResourceTypes(yarnConf);
+  }
+
+  @Before
+  public void setUp() {
+    initResourceTypes();
+    //this needs to be initialized after initResourceTypes is called
+    configuredMaxAllocation = Resource.newInstance(8192, 4,
+            ImmutableMap.<String,
+                    Long>builder()
+                    .put("custom-resource-1", Long.MAX_VALUE)
+                    .put("custom-resource-2", Long.MAX_VALUE)
+                    .put("custom-resource-3", Long.MAX_VALUE)
+                    .build());
+  }
+
   @Test (timeout = 30000)
   public void testNormalizeRequest() {
     ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
@@ -151,16 +216,18 @@ public class TestSchedulerUtils {
     // multiple of minMemory > maxMemory, then reduce to maxMemory
     SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
         maxResource);
-    assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize());
+    assertEquals(maxResource.getMemorySize(),
+        ask.getCapability().getMemorySize());
 
     // ask is more than max
     maxResource = Resources.createResource(maxMemory, 0);
     ask.setCapability(Resources.createResource(maxMemory + 100));
     SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
         maxResource);
-    assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize());
+    assertEquals(maxResource.getMemorySize(),
+        ask.getCapability().getMemorySize());
   }
-  
+
   @Test (timeout = 30000)
   public void testNormalizeRequestWithDominantResourceCalculator() {
     ResourceCalculator resourceCalculator = new DominantResourceCalculator();
@@ -202,10 +269,11 @@ public class TestSchedulerUtils {
     Set<String> queueAccessibleNodeLabels = Sets.newHashSet();
     QueueInfo queueInfo = mock(QueueInfo.class);
     when(queueInfo.getQueueName()).thenReturn("queue");
-    when(queueInfo.getAccessibleNodeLabels()).thenReturn(queueAccessibleNodeLabels);
+    when(queueInfo.getAccessibleNodeLabels())
+        .thenReturn(queueAccessibleNodeLabels);
     when(scheduler.getQueueInfo(any(String.class), anyBoolean(), anyBoolean()))
         .thenReturn(queueInfo);
-    
+
     Resource maxResource = Resources.createResource(
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
@@ -364,7 +432,7 @@ public class TestSchedulerUtils {
       rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
           Arrays.asList("x"));
     }
-    Assert.assertTrue("InvalidLabelResourceRequestException excpeted",
+    Assert.assertTrue("InvalidLabelResourceRequestException expected",
         invalidlabelexception);
     // queue is "*", always succeeded
     try {
@@ -611,11 +679,9 @@ public class TestSchedulerUtils {
 
     // more than max vcores
     try {
-      Resource resource =
-          Resources
-              .createResource(
-                  YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-                  YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
+      Resource resource = Resources.createResource(
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
       ResourceRequest resReq =
           BuilderUtils.newResourceRequest(mock(Priority.class),
               ResourceRequest.ANY, resource, 1);
@@ -649,10 +715,10 @@ public class TestSchedulerUtils {
     waitForLaunchedState(attempt);
 
     // Create a client to the RM.
-    final Configuration conf = rm.getConfig();
-    final YarnRPC rpc = YarnRPC.create(conf);
+    final Configuration yarnConf = rm.getConfig();
+    final YarnRPC rpc = YarnRPC.create(yarnConf);
 
-    UserGroupInformation currentUser = 
+    UserGroupInformation currentUser =
         UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
     Credentials credentials = containerManager.getContainerCredentials();
     final InetSocketAddress rmBindAddress =
@@ -666,7 +732,7 @@ public class TestSchedulerUtils {
           @Override
           public ApplicationMasterProtocol run() {
             return (ApplicationMasterProtocol) rpc.getProxy(
-              ApplicationMasterProtocol.class, rmBindAddress, conf);
+              ApplicationMasterProtocol.class, rmBindAddress, yarnConf);
           }
         });
 
@@ -776,6 +842,127 @@ public class TestSchedulerUtils {
     }
   }
 
+  @Test
+  public void testCustomResourceRequestedUnitIsSmallerThanAvailableUnit()
+      throws InvalidResourceRequestException {
+    Resource requestedResource =
+        ResourceTypesTestHelper.newResource(1, 1,
+                ImmutableMap.of("custom-resource-1", "11"));
+
+    Resource availableResource =
+        ResourceTypesTestHelper.newResource(1, 1,
+                ImmutableMap.of("custom-resource-1", "0G"));
+
+    exception.expect(InvalidResourceRequestException.class);
+    exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator
+        .create().withRequestedResourceType("custom-resource-1")
+        .withRequestedResource(requestedResource)
+        .withAvailableAllocation(availableResource)
+        .withMaxAllocation(configuredMaxAllocation).build());
+
+    SchedulerUtils.checkResourceRequestAgainstAvailableResource(
+        requestedResource, availableResource);
+  }
+
+  @Test
+  public void testCustomResourceRequestedUnitIsSmallerThanAvailableUnit2() {
+    Resource requestedResource =
+        ResourceTypesTestHelper.newResource(1, 1,
+                ImmutableMap.of("custom-resource-1", "11"));
+
+    Resource availableResource =
+        ResourceTypesTestHelper.newResource(1, 1,
+                ImmutableMap.of("custom-resource-1", "1G"));
+
+    try {
+      SchedulerUtils.checkResourceRequestAgainstAvailableResource(
+          requestedResource, availableResource);
+    } catch (InvalidResourceRequestException e) {
+      fail(String.format(
+          "Resource request should be accepted. Requested: %s, available: %s",
+          requestedResource, availableResource));
+    }
+  }
+
+  @Test
+  public void testCustomResourceRequestedUnitIsGreaterThanAvailableUnit()
+      throws InvalidResourceRequestException {
+    Resource requestedResource =
+        ResourceTypesTestHelper.newResource(1, 1,
+                ImmutableMap.of("custom-resource-1", "1M"));
+
+    Resource availableResource = ResourceTypesTestHelper.newResource(1, 1,
+        ImmutableMap.<String, String> builder().put("custom-resource-1", "120k")
+            .build());
+
+    exception.expect(InvalidResourceRequestException.class);
+    exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator
+        .create().withRequestedResourceType("custom-resource-1")
+        .withRequestedResource(requestedResource)
+        .withAvailableAllocation(availableResource)
+        .withMaxAllocation(configuredMaxAllocation).build());
+    SchedulerUtils.checkResourceRequestAgainstAvailableResource(
+        requestedResource, availableResource);
+  }
+
+  @Test
+  public void testCustomResourceRequestedUnitIsGreaterThanAvailableUnit2() {
+    Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1,
+        ImmutableMap.<String, String> builder().put("custom-resource-1", "11M")
+            .build());
+
+    Resource availableResource =
+        ResourceTypesTestHelper.newResource(1, 1,
+                ImmutableMap.of("custom-resource-1", "1G"));
+
+    try {
+      SchedulerUtils.checkResourceRequestAgainstAvailableResource(
+          requestedResource, availableResource);
+    } catch (InvalidResourceRequestException e) {
+      fail(String.format(
+          "Resource request should be accepted. Requested: %s, available: %s",
+          requestedResource, availableResource));
+    }
+  }
+
+  @Test
+  public void testCustomResourceRequestedUnitIsSameAsAvailableUnit() {
+    Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1,
+        ImmutableMap.of("custom-resource-1", "11M"));
+
+    Resource availableResource = ResourceTypesTestHelper.newResource(1, 1,
+        ImmutableMap.of("custom-resource-1", "100M"));
+
+    try {
+      SchedulerUtils.checkResourceRequestAgainstAvailableResource(
+          requestedResource, availableResource);
+    } catch (InvalidResourceRequestException e) {
+      fail(String.format(
+          "Resource request should be accepted. Requested: %s, available: %s",
+          requestedResource, availableResource));
+    }
+  }
+
+  @Test
+  public void testCustomResourceRequestedUnitIsSameAsAvailableUnit2()
+      throws InvalidResourceRequestException {
+    Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1,
+        ImmutableMap.of("custom-resource-1", "110M"));
+
+    Resource availableResource = ResourceTypesTestHelper.newResource(1, 1,
+        ImmutableMap.of("custom-resource-1", "100M"));
+
+    exception.expect(InvalidResourceRequestException.class);
+    exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator
+        .create().withRequestedResourceType("custom-resource-1")
+        .withRequestedResource(requestedResource)
+        .withAvailableAllocation(availableResource)
+        .withMaxAllocation(configuredMaxAllocation).build());
+
+    SchedulerUtils.checkResourceRequestAgainstAvailableResource(
+        requestedResource, availableResource);
+  }
+
   public static void waitSchedulerApplicationAttemptStopped(
       AbstractYarnScheduler ys,
       ApplicationAttemptId attemptId) throws InterruptedException {
@@ -943,8 +1130,8 @@ public class TestSchedulerUtils {
   public static SchedulerApplication<SchedulerApplicationAttempt>
       verifyAppAddedAndRemovedFromScheduler(
           Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
-          EventHandler<SchedulerEvent> handler, String queueName)
-          throws Exception {
+          EventHandler<SchedulerEvent> handler, String queueName) {
+
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     AppAddedSchedulerEvent appAddedEvent =
@@ -973,4 +1160,60 @@ public class TestSchedulerUtils {
     when(rmContext.getNodeLabelManager()).thenReturn(nlm);
     return rmContext;
   }
+
+  private static class InvalidResourceRequestExceptionMessageGenerator {
+
+    private StringBuilder sb;
+    private Resource requestedResource;
+    private Resource availableAllocation;
+    private Resource configuredMaxAllowedAllocation;
+    private String resourceType;
+
+    InvalidResourceRequestExceptionMessageGenerator(StringBuilder sb) {
+      this.sb = sb;
+    }
+
+    public static InvalidResourceRequestExceptionMessageGenerator create() {
+      return new InvalidResourceRequestExceptionMessageGenerator(
+          new StringBuilder());
+    }
+
+    InvalidResourceRequestExceptionMessageGenerator withRequestedResource(
+            Resource r) {
+      this.requestedResource = r;
+      return this;
+    }
+
+    InvalidResourceRequestExceptionMessageGenerator withRequestedResourceType(
+            String rt) {
+      this.resourceType = rt;
+      return this;
+    }
+
+    InvalidResourceRequestExceptionMessageGenerator withAvailableAllocation(
+            Resource r) {
+      this.availableAllocation = r;
+      return this;
+    }
+
+    InvalidResourceRequestExceptionMessageGenerator withMaxAllocation(
+        Resource r) {
+      this.configuredMaxAllowedAllocation = r;
+      return this;
+    }
+
+    public String build() {
+      return sb
+          .append("Invalid resource request, requested resource type=[")
+          .append(resourceType).append("]")
+          .append(" < 0 or greater than maximum allowed allocation. ")
+          .append("Requested resource=").append(requestedResource).append(", ")
+          .append("maximum allowed allocation=").append(availableAllocation)
+          .append(", please note that maximum allowed allocation is calculated "
+              + "by scheduler based on maximum resource of " +
+                  "registered NodeManagers, which might be less than " +
+                  "configured maximum allocation=")
+          .append(configuredMaxAllowedAllocation).toString();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org