You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2019/03/21 00:46:56 UTC

[hadoop] branch YARN-8200 updated (7d1832d -> dcb7d7a)

This is an automated email from the ASF dual-hosted git repository.

jhung pushed a change to branch YARN-8200
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 7d1832d  YARN-9397. Fix empty NMResourceInfo object test failures in branch-2
     new ff91a67  YARN-9291. Backport YARN-7637 to branch-2
     new a9c58d3  YARN-7345. GPU Isolation: Incorrect minor device numbers written to devices.deny file. (Jonathan Hung via wangda)
     new 5ea878d  YARN-7143. FileNotFound handling in ResourceUtils is inconsistent
     new 26e1c49  YARN-7383. Node resource is not parsed correctly for resource names containing dot. Contributed by Gergely Novák.
     new a06fc3f  YARN-8183. Fix ConcurrentModificationException inside RMAppAttemptMetrics#convertAtomicLongMaptoLongMap. (Suma Shivaprasad via wangda)
     new dcb7d7a  YARN-9271. Backport YARN-6927 for resource type support in MapReduce

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mapreduce/v2/app/job/impl/TaskAttemptImpl.java | 141 +++++++-
 .../mapreduce/TestMapreduceConfigFields.java       |  11 +
 .../mapreduce/v2/app/job/impl/TestTaskAttempt.java | 365 ++++++++++++++++++++-
 .../org/apache/hadoop/mapreduce/MRJobConfig.java   |  68 +++-
 .../java/org/apache/hadoop/mapred/YARNRunner.java  |  86 ++++-
 .../org/apache/hadoop/mapred/TestYARNRunner.java   | 167 ++++++++++
 .../hadoop/yarn/util/resource/ResourceUtils.java   | 105 ++++--
 .../yarn/util/resource/TestResourceUtils.java      |   5 +-
 .../resources/resource-types/node-resources-2.xml  |   5 +
 .../resources/resource-types/resource-types-4.xml  |   7 +-
 .../recovery/NMNullStateStoreService.java          |   1 +
 .../impl/modules/gpu/gpu-module.c                  |   2 +-
 .../test/modules/gpu/test-gpu-module.cc            |  13 +
 .../resources/gpu/TestGpuResourceHandler.java      |  30 ++
 .../rmapp/attempt/RMAppAttemptMetrics.java         |   9 +-
 15 files changed, 943 insertions(+), 72 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 05/06: YARN-8183. Fix ConcurrentModificationException inside RMAppAttemptMetrics#convertAtomicLongMaptoLongMap. (Suma Shivaprasad via wangda)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch YARN-8200
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a06fc3fdb536d5ba6ee00ec356cf97bfa5b77d1a
Author: Wangda Tan <wa...@apache.org>
AuthorDate: Tue Apr 24 17:42:17 2018 -0700

    YARN-8183. Fix ConcurrentModificationException inside RMAppAttemptMetrics#convertAtomicLongMaptoLongMap. (Suma Shivaprasad via wangda)
    
    Change-Id: I347871d672001653a3afe2e99adefd74e0d798cd
    (cherry picked from commit bb3c504764f807fccba7f28298a12e2296f284cb)
    (cherry picked from commit 3043a93d461fd8b9ccc2ff4b8d17e5430ed77615)
---
 .../resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java       | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java
index 0982ef9..e68c5d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -53,8 +54,8 @@ public class RMAppAttemptMetrics {
   
   private ReadLock readLock;
   private WriteLock writeLock;
-  private Map<String, AtomicLong> resourceUsageMap = new HashMap<>();
-  private Map<String, AtomicLong> preemptedResourceMap = new HashMap<>();
+  private Map<String, AtomicLong> resourceUsageMap = new ConcurrentHashMap<>();
+  private Map<String, AtomicLong> preemptedResourceMap = new ConcurrentHashMap<>();
   private RMContext rmContext;
 
   private int[][] localityStatistics =
@@ -97,7 +98,7 @@ public class RMAppAttemptMetrics {
   public Resource getResourcePreempted() {
     try {
       readLock.lock();
-      return resourcePreempted;
+      return Resource.newInstance(resourcePreempted);
     } finally {
       readLock.unlock();
     }
@@ -229,7 +230,7 @@ public class RMAppAttemptMetrics {
   }
 
   public Resource getApplicationAttemptHeadroom() {
-    return applicationHeadroom;
+    return Resource.newInstance(applicationHeadroom);
   }
 
   public void setApplicationAttemptHeadRoom(Resource headRoom) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 04/06: YARN-7383. Node resource is not parsed correctly for resource names containing dot. Contributed by Gergely Novák.

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch YARN-8200
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 26e1c49d28cc111f8e3f7266b71b24c67f453f3a
Author: Sunil G <su...@apache.org>
AuthorDate: Wed Dec 13 22:00:07 2017 +0530

    YARN-7383. Node resource is not parsed correctly for resource names containing dot. Contributed by Gergely Novák.
---
 .../apache/hadoop/yarn/util/resource/ResourceUtils.java   | 15 ++++++---------
 .../hadoop/yarn/util/resource/TestResourceUtils.java      |  5 ++++-
 .../test/resources/resource-types/node-resources-2.xml    |  5 +++++
 .../test/resources/resource-types/resource-types-4.xml    |  7 ++++++-
 4 files changed, 21 insertions(+), 11 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
index abf58a6..65eb5a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
@@ -461,21 +461,18 @@ public class ResourceUtils {
     for (Map.Entry<String, String> entry : conf) {
       String key = entry.getKey();
       String value = entry.getValue();
-
-      if (key.startsWith(YarnConfiguration.NM_RESOURCES_PREFIX)) {
-        addResourceInformation(key, value, nodeResources);
-      }
+      addResourceTypeInformation(key, value, nodeResources);
     }
 
     return nodeResources;
   }
 
-  private static void addResourceInformation(String prop, String value,
+  private static void addResourceTypeInformation(String prop, String value,
       Map<String, ResourceInformation> nodeResources) {
-    String[] parts = prop.split("\\.");
-    LOG.info("Found resource entry " + prop);
-    if (parts.length == 4) {
-      String resourceType = parts[3];
+    if (prop.startsWith(YarnConfiguration.NM_RESOURCES_PREFIX)) {
+      LOG.info("Found resource entry " + prop);
+      String resourceType = prop.substring(
+          YarnConfiguration.NM_RESOURCES_PREFIX.length());
       if (!nodeResources.containsKey(resourceType)) {
         nodeResources
             .put(resourceType, ResourceInformation.newInstance(resourceType));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java
index 80555ca..b511705 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java
@@ -124,9 +124,10 @@ public class TestResourceUtils {
         new ResourceFileInformation("resource-types-3.xml", 3);
     testFile3.resourceNameUnitsMap.put("resource2", "");
     ResourceFileInformation testFile4 =
-        new ResourceFileInformation("resource-types-4.xml", 4);
+        new ResourceFileInformation("resource-types-4.xml", 5);
     testFile4.resourceNameUnitsMap.put("resource1", "G");
     testFile4.resourceNameUnitsMap.put("resource2", "m");
+    testFile4.resourceNameUnitsMap.put("yarn.io/gpu", "");
 
     ResourceFileInformation[] tests = {testFile1, testFile2, testFile3,
         testFile4};
@@ -292,6 +293,8 @@ public class TestResourceUtils {
         ResourceInformation.newInstance("resource1", "Gi", 5L));
     test3Resources.setResourceInformation("resource2",
         ResourceInformation.newInstance("resource2", "m", 2L));
+    test3Resources.setResourceInformation("yarn.io/gpu",
+        ResourceInformation.newInstance("yarn.io/gpu", "", 1));
     testRun.put("node-resources-2.xml", test3Resources);
 
     for (Map.Entry<String, Resource> entry : testRun.entrySet()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/node-resources-2.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/node-resources-2.xml
index 9d9b3dc..382d5dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/node-resources-2.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/node-resources-2.xml
@@ -36,4 +36,9 @@ limitations under the License. See accompanying LICENSE file.
    <value>2m</value>
  </property>
 
+ <property>
+   <name>yarn.nodemanager.resource-type.yarn.io/gpu</name>
+   <value>1</value>
+ </property>
+
 </configuration>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/resource-types-4.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/resource-types-4.xml
index c84316a..ea8d2bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/resource-types-4.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/resource-types-4.xml
@@ -18,7 +18,7 @@ limitations under the License. See accompanying LICENSE file.
 
  <property>
    <name>yarn.resource-types</name>
-   <value>resource1,resource2</value>
+   <value>resource1,resource2,yarn.io/gpu</value>
  </property>
 
  <property>
@@ -31,4 +31,9 @@ limitations under the License. See accompanying LICENSE file.
    <value>m</value>
  </property>
 
+ <property>
+   <name>yarn.resource-types.yarn.io/gpu.units</name>
+   <value></value>
+ </property>
+
 </configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/06: YARN-9291. Backport YARN-7637 to branch-2

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch YARN-8200
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ff91a676f2fd8fcc3bc285ba0d37291a4103c396
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Wed Mar 20 17:45:01 2019 -0700

    YARN-9291. Backport YARN-7637 to branch-2
---
 .../recovery/NMNullStateStoreService.java          |  1 +
 .../resources/gpu/TestGpuResourceHandler.java      | 30 ++++++++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 7d1010f..95ec61a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -272,6 +272,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
   public void storeAssignedResources(Container container,
       String resourceType, List<Serializable> assignedResources)
       throws IOException {
+    updateContainerResourceMapping(container, resourceType, assignedResources);
   }
 
   @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandler.java
index b5796df..7a3bd02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandler.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resource
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
 import org.junit.Assert;
@@ -349,6 +350,35 @@ public class TestGpuResourceHandler {
   }
 
   @Test
+  public void testAllocationStoredWithNULLStateStore() throws Exception {
+    NMNullStateStoreService mockNMNULLStateStore = mock(NMNullStateStoreService.class);
+
+    Context nmnctx = mock(Context.class);
+    when(nmnctx.getNMStateStore()).thenReturn(mockNMNULLStateStore);
+
+    GpuResourceHandlerImpl gpuNULLStateResourceHandler =
+        new GpuResourceHandlerImpl(nmnctx, mockCGroupsHandler,
+        mockPrivilegedExecutor);
+
+    Configuration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0:0,1:1,2:3,3:4");
+    GpuDiscoverer.getInstance().initialize(conf);
+
+    gpuNULLStateResourceHandler.bootstrap(conf);
+    Assert.assertEquals(4,
+        gpuNULLStateResourceHandler.getGpuAllocator().getAvailableGpus());
+
+    /* Start container 1, asks 3 containers */
+    Container container = mockContainerWithGpuRequest(1, 3);
+    gpuNULLStateResourceHandler.preStart(container);
+
+    verify(nmnctx.getNMStateStore()).storeAssignedResources(container,
+        ResourceInformation.GPU_URI, Arrays
+            .<Serializable>asList(new GpuDevice(0, 0), new GpuDevice(1, 1),
+                new GpuDevice(2, 3)));
+  }
+
+  @Test
   public void testRecoverResourceAllocation() throws Exception {
     Configuration conf = new YarnConfiguration();
     conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0:0,1:1,2:3,3:4");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/06: YARN-7345. GPU Isolation: Incorrect minor device numbers written to devices.deny file. (Jonathan Hung via wangda)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch YARN-8200
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a9c58d38c1926300029c44f9f13d9cf1093dd650
Author: Wangda Tan <wa...@apache.org>
AuthorDate: Thu Oct 19 14:45:44 2017 -0700

    YARN-7345. GPU Isolation: Incorrect minor device numbers written to devices.deny file. (Jonathan Hung via wangda)
---
 .../native/container-executor/impl/modules/gpu/gpu-module.c |  2 +-
 .../container-executor/test/modules/gpu/test-gpu-module.cc  | 13 +++++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/modules/gpu/gpu-module.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/modules/gpu/gpu-module.c
index f96645d..1a1b164 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/modules/gpu/gpu-module.c
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/modules/gpu/gpu-module.c
@@ -108,7 +108,7 @@ static int internal_handle_gpu_request(
     char param_value[128];
     memset(param_value, 0, sizeof(param_value));
     snprintf(param_value, sizeof(param_value), "c %d:%d rwm",
-             major_device_number, i);
+             major_device_number, minor_devices[i]);
 
     int rc = update_cgroups_parameters_func_p("devices", "deny",
       container_id, param_value);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/modules/gpu/test-gpu-module.cc b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/modules/gpu/test-gpu-module.cc
index 7e41fb4..b3d93dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/modules/gpu/test-gpu-module.cc
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/modules/gpu/test-gpu-module.cc
@@ -165,6 +165,19 @@ TEST_F(TestGpuModule, test_verify_gpu_module_calls_cgroup_parameter) {
 
   // Verify cgroups parameters
   verify_param_updated_to_cgroups(0, NULL);
+
+  /* Test case 3: block 2 non-sequential devices */
+  cgroups_parameters_invoked.clear();
+  char* argv_2[] = { (char*) "--module-gpu", (char*) "--excluded_gpus", (char*) "1,3",
+                   (char*) "--container_id", container_id };
+  rc = handle_gpu_request(&mock_update_cgroups_parameters,
+     "gpu", 5, argv_2);
+  ASSERT_EQ(0, rc) << "Should success.\n";
+
+  // Verify cgroups parameters
+  const char* expected_cgroups_argv_2[] = { "devices", "deny", container_id, "c 195:1 rwm",
+    "devices", "deny", container_id, "c 195:3 rwm"};
+  verify_param_updated_to_cgroups(8, expected_cgroups_argv_2);
 }
 
 TEST_F(TestGpuModule, test_illegal_cli_parameters) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/06: YARN-7143. FileNotFound handling in ResourceUtils is inconsistent

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch YARN-8200
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 5ea878dc059e43009b51cdcded164a6b6f5b63e6
Author: Daniel Templeton <te...@apache.org>
AuthorDate: Thu Nov 9 10:36:49 2017 -0800

    YARN-7143. FileNotFound handling in ResourceUtils is inconsistent
    
    Change-Id: Ib1bb487e14a15edd2b5a42cf5078c5a2b295f069
    (cherry picked from commit db82a41d94872cea4d0c1bb1336916cebc2faeec)
---
 .../hadoop/yarn/util/resource/ResourceUtils.java   | 52 +++++++++-------------
 1 file changed, 22 insertions(+), 30 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
index f3edc74..abf58a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
@@ -338,18 +338,14 @@ public class ResourceUtils {
     if (!initializedResources) {
       synchronized (ResourceUtils.class) {
         if (!initializedResources) {
-          if (conf == null) {
-            conf = new YarnConfiguration();
-          }
-          try {
-            addResourcesFileToConf(resourceFile, conf);
-            LOG.debug("Found " + resourceFile + ", adding to configuration");
-          } catch (FileNotFoundException fe) {
-            LOG.info("Unable to find '" + resourceFile
-                + "'. Falling back to memory and vcores as resources.");
+          Configuration resConf = conf;
+
+          if (resConf == null) {
+            resConf = new YarnConfiguration();
           }
-          initializeResourcesMap(conf);
 
+          addResourcesFileToConf(resourceFile, resConf);
+          initializeResourcesMap(resConf);
         }
       }
     }
@@ -386,21 +382,17 @@ public class ResourceUtils {
   }
 
   private static void addResourcesFileToConf(String resourceFile,
-      Configuration conf) throws FileNotFoundException {
+      Configuration conf) {
     try {
       InputStream ris = getConfInputStream(resourceFile, conf);
       LOG.debug("Found " + resourceFile + ", adding to configuration");
       conf.addResource(ris);
     } catch (FileNotFoundException fe) {
-      throw fe;
-    } catch (IOException ie) {
+      LOG.info("Unable to find '" + resourceFile + "'.");
+    } catch (IOException | YarnException ex) {
       LOG.fatal("Exception trying to read resource types configuration '"
-          + resourceFile + "'.", ie);
-      throw new YarnRuntimeException(ie);
-    } catch (YarnException ye) {
-      LOG.fatal("YARN Exception trying to read resource types configuration '"
-          + resourceFile + "'.", ye);
-      throw new YarnRuntimeException(ye);
+          + resourceFile + "'.", ex);
+      throw new YarnRuntimeException(ex);
     }
   }
 
@@ -462,19 +454,19 @@ public class ResourceUtils {
   private static Map<String, ResourceInformation> initializeNodeResourceInformation(
       Configuration conf) {
     Map<String, ResourceInformation> nodeResources = new HashMap<>();
-    try {
-      addResourcesFileToConf(
-          YarnConfiguration.NODE_RESOURCES_CONFIGURATION_FILE, conf);
-      for (Map.Entry<String, String> entry : conf) {
-        String key = entry.getKey();
-        String value = entry.getValue();
-        if (key.startsWith(YarnConfiguration.NM_RESOURCES_PREFIX)) {
-          addResourceInformation(key, value, nodeResources);
-        }
+
+    addResourcesFileToConf(YarnConfiguration.NODE_RESOURCES_CONFIGURATION_FILE,
+        conf);
+
+    for (Map.Entry<String, String> entry : conf) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+
+      if (key.startsWith(YarnConfiguration.NM_RESOURCES_PREFIX)) {
+        addResourceInformation(key, value, nodeResources);
       }
-    } catch (FileNotFoundException fe) {
-      LOG.info("Couldn't find node resources file");
     }
+
     return nodeResources;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 06/06: YARN-9271. Backport YARN-6927 for resource type support in MapReduce

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch YARN-8200
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit dcb7d7a451bc7d4f589fba5853746a0265bc8908
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Wed Mar 20 17:46:35 2019 -0700

    YARN-9271. Backport YARN-6927 for resource type support in MapReduce
---
 .../mapreduce/v2/app/job/impl/TaskAttemptImpl.java | 141 +++++++-
 .../mapreduce/TestMapreduceConfigFields.java       |  11 +
 .../mapreduce/v2/app/job/impl/TestTaskAttempt.java | 365 ++++++++++++++++++++-
 .../org/apache/hadoop/mapreduce/MRJobConfig.java   |  68 +++-
 .../java/org/apache/hadoop/mapred/YARNRunner.java  |  86 ++++-
 .../org/apache/hadoop/mapred/TestYARNRunner.java   | 167 ++++++++++
 .../hadoop/yarn/util/resource/ResourceUtils.java   |  44 +++
 7 files changed, 853 insertions(+), 29 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index dfc3adb..3f37d4d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
+import static org.apache.commons.lang.StringUtils.isEmpty;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -126,6 +128,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -139,6 +142,8 @@ import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -664,12 +669,8 @@ public abstract class TaskAttemptImpl implements
     this.jobFile = jobFile;
     this.partition = partition;
 
-    //TODO:create the resource reqt for this Task attempt
     this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
-    this.resourceCapability.setMemorySize(
-        getMemoryRequired(conf, taskId.getTaskType()));
-    this.resourceCapability.setVirtualCores(
-        getCpuRequired(conf, taskId.getTaskType()));
+    populateResourceCapability(taskId.getTaskType());
 
     this.dataLocalHosts = resolveHosts(dataLocalHosts);
     RackResolver.init(conf);
@@ -701,21 +702,133 @@ public abstract class TaskAttemptImpl implements
     return memory;
   }
 
+  private void populateResourceCapability(TaskType taskType) {
+    String resourceTypePrefix =
+        getResourceTypePrefix(taskType);
+    boolean memorySet = false;
+    boolean cpuVcoresSet = false;
+    if (resourceTypePrefix != null) {
+      List<ResourceInformation> resourceRequests =
+          ResourceUtils.getRequestedResourcesFromConfig(conf,
+              resourceTypePrefix);
+      for (ResourceInformation resourceRequest : resourceRequests) {
+        String resourceName = resourceRequest.getName();
+        if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
+            MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
+                resourceName)) {
+          if (memorySet) {
+            throw new IllegalArgumentException(
+                "Only one of the following keys " +
+                    "can be specified for a single job: " +
+                    MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
+                    MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
+          }
+          String units = isEmpty(resourceRequest.getUnits()) ?
+              ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
+                resourceRequest.getUnits();
+          this.resourceCapability.setMemorySize(
+              UnitsConversionUtil.convert(units, "Mi",
+                  resourceRequest.getValue()));
+          memorySet = true;
+          String memoryKey = getMemoryKey(taskType);
+          if (memoryKey != null && conf.get(memoryKey) != null) {
+            LOG.warn("Configuration " + resourceTypePrefix + resourceName +
+                "=" + resourceRequest.getValue() + resourceRequest.getUnits() +
+                " is overriding the " + memoryKey + "=" + conf.get(memoryKey) +
+                " configuration");
+          }
+        } else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(
+            resourceName)) {
+          this.resourceCapability.setVirtualCores(
+              (int) UnitsConversionUtil.convert(resourceRequest.getUnits(), "",
+                  resourceRequest.getValue()));
+          cpuVcoresSet = true;
+          String cpuKey = getCpuVcoresKey(taskType);
+          if (cpuKey != null && conf.get(cpuKey) != null) {
+            LOG.warn("Configuration " + resourceTypePrefix +
+                MRJobConfig.RESOURCE_TYPE_NAME_VCORE + "=" +
+                resourceRequest.getValue() + resourceRequest.getUnits() +
+                " is overriding the " + cpuKey + "=" +
+                conf.get(cpuKey) + " configuration");
+          }
+        } else {
+          ResourceInformation resourceInformation =
+              this.resourceCapability.getResourceInformation(resourceName);
+          resourceInformation.setUnits(resourceRequest.getUnits());
+          resourceInformation.setValue(resourceRequest.getValue());
+          this.resourceCapability.setResourceInformation(resourceName,
+              resourceInformation);
+        }
+      }
+    }
+    if (!memorySet) {
+      this.resourceCapability.setMemorySize(getMemoryRequired(conf, taskType));
+    }
+    if (!cpuVcoresSet) {
+      this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType));
+    }
+  }
+
+  private String getCpuVcoresKey(TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return MRJobConfig.MAP_CPU_VCORES;
+    case REDUCE:
+      return MRJobConfig.REDUCE_CPU_VCORES;
+    default:
+      return null;
+    }
+  }
+
+  private String getMemoryKey(TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return MRJobConfig.MAP_MEMORY_MB;
+    case REDUCE:
+      return MRJobConfig.REDUCE_MEMORY_MB;
+    default:
+      return null;
+    }
+  }
+
+  private Integer getCpuVcoreDefault(TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return MRJobConfig.DEFAULT_MAP_CPU_VCORES;
+    case REDUCE:
+      return MRJobConfig.DEFAULT_REDUCE_CPU_VCORES;
+    default:
+      return null;
+    }
+  }
+
   private int getCpuRequired(Configuration conf, TaskType taskType) {
     int vcores = 1;
-    if (taskType == TaskType.MAP)  {
-      vcores =
-          conf.getInt(MRJobConfig.MAP_CPU_VCORES,
-              MRJobConfig.DEFAULT_MAP_CPU_VCORES);
-    } else if (taskType == TaskType.REDUCE) {
-      vcores =
-          conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
-              MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+    String cpuVcoreKey = getCpuVcoresKey(taskType);
+    if (cpuVcoreKey != null) {
+      Integer defaultCpuVcores = getCpuVcoreDefault(taskType);
+      if (null == defaultCpuVcores) {
+        defaultCpuVcores = vcores;
+      }
+      vcores = conf.getInt(cpuVcoreKey, defaultCpuVcores);
     }
-    
     return vcores;
   }
 
+  private String getResourceTypePrefix(TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return MRJobConfig.MAP_RESOURCE_TYPE_PREFIX;
+    case REDUCE:
+      return MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX;
+    default:
+      LOG.info("TaskType " + taskType +
+          " does not support custom resource types - this support can be " +
+          "added in " + getClass().getSimpleName());
+      return null;
+    }
+  }
+
   /**
    * Create a {@link LocalResource} record with all the given parameters.
    * The NM that hosts AM container will upload resources to shared cache.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
index 096cec9..f469aad 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
@@ -78,6 +78,17 @@ public class TestMapreduceConfigFields extends TestConfigurationFieldsBase {
     xmlPropsToSkipCompare.add("mapreduce.local.clientfactory.class.name");
     xmlPropsToSkipCompare.add("mapreduce.jobtracker.system.dir");
     xmlPropsToSkipCompare.add("mapreduce.jobtracker.staging.root.dir");
+
+    // Resource type related properties are only prefixes,
+    // they need to be postfixed with the resource name
+    // in order to take effect.
+    // There is nothing to be added to mapred-default.xml
+    configurationPropsToSkipCompare.add(
+        MRJobConfig.MR_AM_RESOURCE_PREFIX);
+    configurationPropsToSkipCompare.add(
+        MRJobConfig.MAP_RESOURCE_TYPE_PREFIX);
+    configurationPropsToSkipCompare.add(
+        MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX);
   }
 
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index 60a2177..e055798 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -28,14 +28,21 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import com.google.common.base.Supplier;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +50,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
+import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -83,24 +91,36 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import com.google.common.collect.ImmutableList;
+
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class TestTaskAttempt{
-	
+
+  private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource";
+
   static public class StubbedFS extends RawLocalFileSystem {
     @Override
     public FileStatus getFileStatus(Path f) throws IOException {
@@ -108,6 +128,63 @@ public class TestTaskAttempt{
     }
   }
 
+  private static class CustomResourceTypesConfigurationProvider
+      extends LocalConfigurationProvider {
+
+    @Override
+    public InputStream getConfigurationInputStream(Configuration bootstrapConf,
+        String name) throws YarnException, IOException {
+      if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
+        return new ByteArrayInputStream(
+            ("<configuration>\n" +
+            " <property>\n" +
+            "   <name>yarn.resource-types</name>\n" +
+            "   <value>a-custom-resource</value>\n" +
+            " </property>\n" +
+            " <property>\n" +
+            "   <name>yarn.resource-types.a-custom-resource.units</name>\n" +
+            "   <value>G</value>\n" +
+            " </property>\n" +
+            "</configuration>\n").getBytes());
+      } else {
+        return super.getConfigurationInputStream(bootstrapConf, name);
+      }
+    }
+  }
+
+  private static class TestAppender extends AppenderSkeleton {
+
+    private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
+
+    @Override
+    public boolean requiresLayout() {
+      return false;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    protected void append(LoggingEvent arg0) {
+      logEvents.add(arg0);
+    }
+
+    private List<LoggingEvent> getLogEvents() {
+      return logEvents;
+    }
+  }
+
+  @BeforeClass
+  public static void setupBeforeClass() {
+    ResourceUtils.resetResourceTypes(new Configuration());
+  }
+
+  @After
+  public void tearDown() {
+    ResourceUtils.resetResourceTypes(new Configuration());
+  }
+
   @Test
   public void testMRAppHistoryForMap() throws Exception {
     MRApp app = new FailingAttemptsMRApp(1, 0);
@@ -329,17 +406,18 @@ public class TestTaskAttempt{
   private TaskAttemptImpl createMapTaskAttemptImplForTest(
       EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
     Clock clock = SystemClock.getInstance();
-    return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
+    return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo,
+        clock, new JobConf());
   }
 
   private TaskAttemptImpl createMapTaskAttemptImplForTest(
-      EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
+      EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo,
+      Clock clock, JobConf jobConf) {
     ApplicationId appId = ApplicationId.newInstance(1, 1);
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
     TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
     Path jobFile = mock(Path.class);
-    JobConf jobConf = new JobConf();
     TaskAttemptImpl taImpl =
         new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
             taskSplitMetaInfo, jobConf, taListener, null,
@@ -347,6 +425,20 @@ public class TestTaskAttempt{
     return taImpl;
   }
 
+  private TaskAttemptImpl createReduceTaskAttemptImplForTest(
+      EventHandler eventHandler, Clock clock, JobConf jobConf) {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    Path jobFile = mock(Path.class);
+    TaskAttemptImpl taImpl =
+        new ReduceTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+            1, jobConf, taListener, null,
+            null, clock, null);
+    return taImpl;
+  }
+
   private void testMRAppHistory(MRApp app) throws Exception {
     Configuration conf = new Configuration();
     Job job = app.submit(conf);
@@ -1423,6 +1515,271 @@ public class TestTaskAttempt{
     assertFalse("InternalError occurred", eventHandler.internalError);
   }
 
+  @Test
+  public void testMapperCustomResourceTypes() {
+    initResourceTypes();
+    EventHandler eventHandler = mock(EventHandler.class);
+    TaskSplitMetaInfo taskSplitMetaInfo = new TaskSplitMetaInfo();
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.setLong(MRJobConfig.MAP_RESOURCE_TYPE_PREFIX
+        + CUSTOM_RESOURCE_NAME, 7L);
+    TaskAttemptImpl taImpl = createMapTaskAttemptImplForTest(eventHandler,
+        taskSplitMetaInfo, clock, jobConf);
+    ResourceInformation resourceInfo =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getResourceInformation(CUSTOM_RESOURCE_NAME);
+    assertEquals("Expecting the default unit (G)",
+        "G", resourceInfo.getUnits());
+    assertEquals(7L, resourceInfo.getValue());
+  }
+
+  @Test
+  public void testReducerCustomResourceTypes() {
+    initResourceTypes();
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
+        + CUSTOM_RESOURCE_NAME, "3m");
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+    ResourceInformation resourceInfo =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getResourceInformation(CUSTOM_RESOURCE_NAME);
+    assertEquals("Expecting the specified unit (m)",
+        "m", resourceInfo.getUnits());
+    assertEquals(3L, resourceInfo.getValue());
+  }
+
+  @Test
+  public void testReducerMemoryRequestViaMapreduceReduceMemoryMb() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+    long memorySize =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getMemorySize();
+    assertEquals(2048, memorySize);
+  }
+
+  @Test
+  public void testReducerMemoryRequestViaMapreduceReduceResourceMemory() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, "2 Gi");
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+    long memorySize =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getMemorySize();
+    assertEquals(2048, memorySize);
+  }
+
+  @Test
+  public void testReducerMemoryRequestDefaultMemory() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
+    long memorySize =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getMemorySize();
+    assertEquals(MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, memorySize);
+  }
+
+  @Test
+  public void testReducerMemoryRequestWithoutUnits() {
+    Clock clock = SystemClock.getInstance();
+    for (String memoryResourceName : ImmutableList.of(
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+        MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+      EventHandler eventHandler = mock(EventHandler.class);
+      JobConf jobConf = new JobConf();
+      jobConf.setInt(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+          memoryResourceName, 2048);
+      TaskAttemptImpl taImpl =
+          createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+      long memorySize =
+          getResourceInfoFromContainerRequest(taImpl, eventHandler).
+          getMemorySize();
+      assertEquals(2048, memorySize);
+    }
+  }
+
+  @Test
+  public void testReducerMemoryRequestOverriding() {
+    for (String memoryName : ImmutableList.of(
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+        MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+      TestAppender testAppender = new TestAppender();
+      final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
+      try {
+        logger.addAppender(testAppender);
+        EventHandler eventHandler = mock(EventHandler.class);
+        Clock clock = SystemClock.getInstance();
+        JobConf jobConf = new JobConf();
+        jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
+            "3Gi");
+        jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
+        TaskAttemptImpl taImpl =
+            createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+        long memorySize =
+            getResourceInfoFromContainerRequest(taImpl, eventHandler).
+            getMemorySize();
+        assertEquals(3072, memorySize);
+        boolean foundLogWarning = false;
+        for (LoggingEvent e : testAppender.getLogEvents()) {
+          if (e.getLevel() == Level.WARN && ("Configuration " +
+                "mapreduce.reduce.resource." + memoryName + "=3Gi is " +
+                "overriding the mapreduce.reduce.memory.mb=2048 configuration")
+          .equals(e.getMessage())) {
+            foundLogWarning = true;
+            break;
+          }
+        }
+        assertTrue(foundLogWarning);
+      } finally {
+        logger.removeAppender(testAppender);
+      }
+    }
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testReducerMemoryRequestMultipleName() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    for (String memoryName : ImmutableList.of(
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+        MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+      jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
+          "3Gi");
+    }
+    createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+  }
+
+  @Test
+  public void testReducerCpuRequestViaMapreduceReduceCpuVcores() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 3);
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+    int vCores =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getVirtualCores();
+    assertEquals(3, vCores);
+  }
+
+  @Test
+  public void testReducerCpuRequestViaMapreduceReduceResourceVcores() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+        MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "5");
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+    int vCores =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getVirtualCores();
+    assertEquals(5, vCores);
+  }
+
+  @Test
+  public void testReducerCpuRequestDefaultMemory() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
+    int vCores =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getVirtualCores();
+    assertEquals(MRJobConfig.DEFAULT_REDUCE_CPU_VCORES, vCores);
+  }
+
+  @Test
+  public void testReducerCpuRequestOverriding() {
+    TestAppender testAppender = new TestAppender();
+    final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
+    try {
+      logger.addAppender(testAppender);
+      EventHandler eventHandler = mock(EventHandler.class);
+      Clock clock = SystemClock.getInstance();
+      JobConf jobConf = new JobConf();
+      jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+          MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "7");
+      jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 9);
+      TaskAttemptImpl taImpl =
+          createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+      long vCores =
+          getResourceInfoFromContainerRequest(taImpl, eventHandler).
+          getVirtualCores();
+      assertEquals(7, vCores);
+      boolean foundLogWarning = false;
+      for (LoggingEvent e : testAppender.getLogEvents()) {
+        if (e.getLevel() == Level.WARN && ("Configuration " +
+              "mapreduce.reduce.resource.vcores=7 is overriding the " +
+              "mapreduce.reduce.cpu.vcores=9 configuration"
+        ).equals(e.getMessage())) {
+          foundLogWarning = true;
+          break;
+        }
+      }
+      assertTrue(foundLogWarning);
+    } finally {
+      logger.removeAppender(testAppender);
+    }
+  }
+
+  private Resource getResourceInfoFromContainerRequest(
+      TaskAttemptImpl taImpl, EventHandler eventHandler) {
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_SCHEDULE));
+
+    assertEquals("Task attempt is not in STARTING state", taImpl.getState(),
+        TaskAttemptState.STARTING);
+
+    ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(captor.capture());
+
+    List<ContainerRequestEvent> containerRequestEvents = new ArrayList<>();
+    for (Event e : captor.getAllValues()) {
+      if (e instanceof ContainerRequestEvent) {
+        containerRequestEvents.add((ContainerRequestEvent) e);
+      }
+    }
+    assertEquals("Expected one ContainerRequestEvent after scheduling "
+        + "task attempt", 1, containerRequestEvents.size());
+
+    return containerRequestEvents.get(0).getCapability();
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testReducerCustomResourceTypeWithInvalidUnit() {
+    initResourceTypes();
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
+        + CUSTOM_RESOURCE_NAME, "3z");
+    createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+  }
+
+  private void initResourceTypes() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+        CustomResourceTypesConfigurationProvider.class.getName());
+    ResourceUtils.resetResourceTypes(conf);
+  }
+
   private void setupTaskAttemptFinishingMonitor(
       EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
     TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index d666123..5a72def 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -360,12 +360,47 @@ public interface MRJobConfig {
 
   public static final String MAP_INPUT_START = "mapreduce.map.input.start";
 
+  /**
+   * Configuration key for specifying memory requirement for the mapper.
+   * Kept for backward-compatibility, mapreduce.map.resource.memory
+   * is the new preferred way to specify this.
+   */
   public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
   public static final int DEFAULT_MAP_MEMORY_MB = 1024;
 
+  /**
+   * Configuration key for specifying CPU requirement for the mapper.
+   * Kept for backward-compatibility, mapreduce.map.resource.vcores
+   * is the new preferred way to specify this.
+   */
   public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
   public static final int DEFAULT_MAP_CPU_VCORES = 1;
 
+  /**
+   * Custom resource names required by the mapper should be
+   * appended to this prefix, the value's format is {amount}[ ][{unit}].
+   * If no unit is defined, the default unit will be used.
+   * Standard resource names: memory (default unit: Mi), vcores
+   */
+  public static final String MAP_RESOURCE_TYPE_PREFIX =
+      "mapreduce.map.resource.";
+
+  /**
+   * Resource type name for CPU vcores.
+   */
+  public static final String RESOURCE_TYPE_NAME_VCORE = "vcores";
+
+  /**
+   * Resource type name for memory.
+   */
+  public static final String RESOURCE_TYPE_NAME_MEMORY = "memory";
+
+  /**
+   * Alternative resource type name for memory.
+   */
+  public static final String RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY =
+      "memory-mb";
+
   public static final String MAP_ENV = "mapreduce.map.env";
 
   public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
@@ -408,12 +443,31 @@ public interface MRJobConfig {
 
   public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
 
+  /**
+   * Configuration key for specifying memory requirement for the reducer.
+   * Kept for backward-compatibility, mapreduce.reduce.resource.memory
+   * is the new preferred way to specify this.
+   */
   public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
   public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
 
+  /**
+   * Configuration key for specifying CPU requirement for the reducer.
+   * Kept for backward-compatibility, mapreduce.reduce.resource.vcores
+   * is the new preferred way to specify this.
+   */
   public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
   public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
 
+  /**
+   * Resource names required by the reducer should be
+   * appended to this prefix, the value's format is {amount}[ ][{unit}].
+   * If no unit is defined, the default unit will be used.
+   * Standard resource names: memory (default unit: Mi), vcores
+   */
+  public static final String REDUCE_RESOURCE_TYPE_PREFIX =
+      "mapreduce.reduce.resource.";
+
   public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
 
   public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
@@ -599,7 +653,10 @@ public interface MRJobConfig {
   public static final String DEFAULT_MR_AM_STAGING_DIR = 
     "/tmp/hadoop-yarn/staging";
 
-  /** The amount of memory the MR app master needs.*/
+  /** The amount of memory the MR app master needs.
+   * Kept for backward-compatibility, yarn.app.mapreduce.am.resource.memory is
+   * the new preferred way to specify this
+   */
   public static final String MR_AM_VMEM_MB =
     MR_AM_PREFIX+"resource.mb";
   public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
@@ -609,6 +666,15 @@ public interface MRJobConfig {
     MR_AM_PREFIX+"resource.cpu-vcores";
   public static final int DEFAULT_MR_AM_CPU_VCORES = 1;
 
+  /**
+   * Resource names required by the MR AM should be
+   * appended to this prefix, the value's format is {amount}[ ][{unit}].
+   * If no unit is defined, the default unit will be used
+   * Standard resource names: memory (default unit: Mi), vcores
+   */
+  public static final String MR_AM_RESOURCE_PREFIX =
+      MR_AM_PREFIX + "resource.";
+
   /** Command line arguments passed to the MR app master.*/
   public static final String MR_AM_COMMAND_OPTS =
     MR_AM_PREFIX+"command-opts";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index a23ff34..12a3079 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.apache.commons.lang.StringUtils.isEmpty;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MR_AM_RESOURCE_PREFIX;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -84,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -93,6 +97,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -659,16 +665,76 @@ public class YARNRunner implements ClientProtocol {
 
   private List<ResourceRequest> generateResourceRequests() throws IOException {
     Resource capability = recordFactory.newRecordInstance(Resource.class);
-    capability.setMemorySize(
-        conf.getInt(
-            MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
-        )
-    );
-    capability.setVirtualCores(
-        conf.getInt(
-            MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
-        )
-    );
+    boolean memorySet = false;
+    boolean cpuVcoresSet = false;
+    List<ResourceInformation> resourceRequests = ResourceUtils
+        .getRequestedResourcesFromConfig(conf, MR_AM_RESOURCE_PREFIX);
+    for (ResourceInformation resourceReq : resourceRequests) {
+      String resourceName = resourceReq.getName();
+      if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
+          MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
+              resourceName)) {
+        if (memorySet) {
+          throw new IllegalArgumentException(
+              "Only one of the following keys " +
+                  "can be specified for a single job: " +
+                  MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
+                  MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
+        }
+        String units = isEmpty(resourceReq.getUnits()) ?
+            ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
+              resourceReq.getUnits();
+        capability.setMemorySize(
+            UnitsConversionUtil.convert(units, "Mi", resourceReq.getValue()));
+        memorySet = true;
+        if (conf.get(MRJobConfig.MR_AM_VMEM_MB) != null) {
+          LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX +
+              resourceName + "=" + resourceReq.getValue() +
+              resourceReq.getUnits() + " is overriding the " +
+              MRJobConfig.MR_AM_VMEM_MB + "=" +
+              conf.get(MRJobConfig.MR_AM_VMEM_MB) + " configuration");
+        }
+      } else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(resourceName)) {
+        capability.setVirtualCores(
+            (int) UnitsConversionUtil.convert(resourceReq.getUnits(), "",
+                resourceReq.getValue()));
+        cpuVcoresSet = true;
+        if (conf.get(MRJobConfig.MR_AM_CPU_VCORES) != null) {
+          LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX +
+              resourceName + "=" + resourceReq.getValue() +
+              resourceReq.getUnits() + " is overriding the " +
+              MRJobConfig.MR_AM_CPU_VCORES + "=" +
+              conf.get(MRJobConfig.MR_AM_CPU_VCORES) + " configuration");
+        }
+      } else if (!MRJobConfig.MR_AM_VMEM_MB.equals(
+          MR_AM_RESOURCE_PREFIX + resourceName) &&
+          !MRJobConfig.MR_AM_CPU_VCORES.equals(
+              MR_AM_RESOURCE_PREFIX + resourceName)) {
+        // the "mb", "cpu-vcores" resource types are not processed here
+        // since the yarn.app.mapreduce.am.resource.mb,
+        // yarn.app.mapreduce.am.resource.cpu-vcores keys are used for
+        // backward-compatibility - which is handled after this loop
+        ResourceInformation resourceInformation = capability
+            .getResourceInformation(resourceName);
+        resourceInformation.setUnits(resourceReq.getUnits());
+        resourceInformation.setValue(resourceReq.getValue());
+        capability.setResourceInformation(resourceName, resourceInformation);
+      }
+    }
+    if (!memorySet) {
+      capability.setMemorySize(
+          conf.getInt(
+              MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
+          )
+      );
+    }
+    if (!cpuVcoresSet) {
+      capability.setVirtualCores(
+          conf.getInt(
+              MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+          )
+      );
+    }
     if (LOG.isDebugEnabled()) {
       LOG.debug("AppMaster capability = " + capability);
     }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index c79b08e..ecb396e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -32,10 +32,12 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -43,6 +45,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,6 +72,7 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -96,28 +100,37 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.log4j.Appender;
+import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Layout;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.SimpleLayout;
 import org.apache.log4j.WriterAppender;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * Test YarnRunner and make sure the client side plugin works
  * fine
@@ -131,6 +144,53 @@ public class TestYARNRunner {
       MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0,
           MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%"));
 
+  private static class CustomResourceTypesConfigurationProvider
+      extends LocalConfigurationProvider {
+
+    @Override
+    public InputStream getConfigurationInputStream(Configuration bootstrapConf,
+        String name) throws YarnException, IOException {
+      if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
+        return new ByteArrayInputStream(
+            ("<configuration>\n" +
+            " <property>\n" +
+            "   <name>yarn.resource-types</name>\n" +
+            "   <value>a-custom-resource</value>\n" +
+            " </property>\n" +
+            " <property>\n" +
+            "   <name>yarn.resource-types.a-custom-resource.units</name>\n" +
+            "   <value>G</value>\n" +
+            " </property>\n" +
+            "</configuration>\n").getBytes());
+      } else {
+        return super.getConfigurationInputStream(bootstrapConf, name);
+      }
+    }
+  }
+
+  private static class TestAppender extends AppenderSkeleton {
+
+    private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
+
+    @Override
+    public boolean requiresLayout() {
+      return false;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    protected void append(LoggingEvent arg0) {
+      logEvents.add(arg0);
+    }
+
+    private List<LoggingEvent> getLogEvents() {
+      return logEvents;
+    }
+  }
+
   private YARNRunner yarnRunner;
   private ResourceMgrDelegate resourceMgrDelegate;
   private YarnConfiguration conf;
@@ -143,6 +203,11 @@ public class TestYARNRunner {
   private  ClientServiceDelegate clientDelegate;
   private static final String failString = "Rejected job";
 
+  @BeforeClass
+  public static void setupBeforeClass() {
+    ResourceUtils.resetResourceTypes(new Configuration());
+  }
+
   @Before
   public void setUp() throws Exception {
     resourceMgrDelegate = mock(ResourceMgrDelegate.class);
@@ -175,6 +240,7 @@ public class TestYARNRunner {
   @After
   public void cleanup() {
     FileUtil.fullyDelete(testWorkDir);
+    ResourceUtils.resetResourceTypes(new Configuration());
   }
 
   @Test(timeout=20000)
@@ -884,4 +950,105 @@ public class TestYARNRunner {
         .get("hadoop.tmp.dir").equals("testconfdir"));
     UserGroupInformation.reset();
   }
+
+  @Test
+  public void testCustomAMRMResourceType() throws Exception {
+    initResourceTypes();
+    String customResourceName = "a-custom-resource";
+
+    JobConf jobConf = new JobConf();
+
+    jobConf.setInt(MRJobConfig.MR_AM_RESOURCE_PREFIX +
+        customResourceName, 5);
+    jobConf.setInt(MRJobConfig.MR_AM_CPU_VCORES, 3);
+
+    yarnRunner = new YARNRunner(jobConf);
+
+    submissionContext = buildSubmitContext(yarnRunner, jobConf);
+
+    List<ResourceRequest> resourceRequests =
+        submissionContext.getAMContainerResourceRequests();
+
+    Assert.assertEquals(1, resourceRequests.size());
+    ResourceRequest resourceRequest = resourceRequests.get(0);
+
+    ResourceInformation resourceInformation = resourceRequest.getCapability()
+        .getResourceInformation(customResourceName);
+    Assert.assertEquals("Expecting the default unit (G)",
+        "G", resourceInformation.getUnits());
+    Assert.assertEquals(5L, resourceInformation.getValue());
+    Assert.assertEquals(3, resourceRequest.getCapability().getVirtualCores());
+  }
+
+  @Test
+  public void testAMRMemoryRequest() throws Exception {
+    for (String memoryName : ImmutableList.of(
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+        MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+      JobConf jobConf = new JobConf();
+      jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
+
+      yarnRunner = new YARNRunner(jobConf);
+
+      submissionContext = buildSubmitContext(yarnRunner, jobConf);
+
+      List<ResourceRequest> resourceRequests =
+          submissionContext.getAMContainerResourceRequests();
+
+      Assert.assertEquals(1, resourceRequests.size());
+      ResourceRequest resourceRequest = resourceRequests.get(0);
+
+      long memorySize = resourceRequest.getCapability().getMemorySize();
+      Assert.assertEquals(3072, memorySize);
+    }
+  }
+
+  @Test
+  public void testAMRMemoryRequestOverriding() throws Exception {
+    for (String memoryName : ImmutableList.of(
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+        MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+      TestAppender testAppender = new TestAppender();
+      Logger logger = Logger.getLogger(YARNRunner.class);
+      logger.addAppender(testAppender);
+      try {
+        JobConf jobConf = new JobConf();
+        jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
+        jobConf.setInt(MRJobConfig.MR_AM_VMEM_MB, 2048);
+
+        yarnRunner = new YARNRunner(jobConf);
+
+        submissionContext = buildSubmitContext(yarnRunner, jobConf);
+
+        List<ResourceRequest> resourceRequests =
+            submissionContext.getAMContainerResourceRequests();
+
+        Assert.assertEquals(1, resourceRequests.size());
+        ResourceRequest resourceRequest = resourceRequests.get(0);
+
+        long memorySize = resourceRequest.getCapability().getMemorySize();
+        Assert.assertEquals(3072, memorySize);
+        boolean foundLogWarning = false;
+        for (LoggingEvent e : testAppender.getLogEvents()) {
+          if (e.getLevel() == Level.WARN && ("Configuration " +
+                "yarn.app.mapreduce.am.resource." + memoryName + "=3Gi is " +
+                "overriding the yarn.app.mapreduce.am.resource.mb=2048 " +
+                "configuration").equals(e.getMessage())) {
+            foundLogWarning = true;
+            break;
+          }
+        }
+        assertTrue(foundLogWarning);
+      } finally {
+        logger.removeAppender(testAppender);
+      }
+    }
+  }
+
+  private void initResourceTypes() {
+    Configuration configuration = new Configuration();
+    configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+        CustomResourceTypesConfigurationProvider.class.getName());
+    ResourceUtils.resetResourceTypes(configuration);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
index 65eb5a2..3806771 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
@@ -44,7 +44,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
 
@@ -60,6 +63,8 @@ public class ResourceUtils {
 
   private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
   private static final String VCORES = ResourceInformation.VCORES.getName();
+  private static final Pattern RESOURCE_REQUEST_VALUE_PATTERN =
+      Pattern.compile("^([0-9]+) ?([a-zA-Z]*)$");
 
   private static volatile boolean initializedResources = false;
   private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX =
@@ -564,4 +569,43 @@ public class ResourceUtils {
     }
     return array;
   }
+
+  /**
+   * From a given configuration get all entries representing requested
+   * resources: entries that match the {prefix}{resourceName}={value}[{units}]
+   * pattern.
+   * @param configuration The configuration
+   * @param prefix Keys with this prefix are considered from the configuration
+   * @return The list of requested resources as described by the configuration
+   */
+  public static List<ResourceInformation> getRequestedResourcesFromConfig(
+      Configuration configuration, String prefix) {
+    List<ResourceInformation> result = new ArrayList<>();
+    Map<String, String> customResourcesMap = configuration
+        .getValByRegex("^" + Pattern.quote(prefix) + "[^.]+$");
+    for (Entry<String, String> resource : customResourcesMap.entrySet()) {
+      String resourceName = resource.getKey().substring(prefix.length());
+      Matcher matcher =
+          RESOURCE_REQUEST_VALUE_PATTERN.matcher(resource.getValue());
+      if (!matcher.matches()) {
+        String errorMsg = "Invalid resource request specified for property "
+            + resource.getKey() + ": \"" + resource.getValue()
+            + "\", expected format is: value[ ][units]";
+        LOG.error(errorMsg);
+        throw new IllegalArgumentException(errorMsg);
+      }
+      long value = Long.parseLong(matcher.group(1));
+      String unit = matcher.group(2);
+      if (unit.isEmpty()) {
+        unit = ResourceUtils.getDefaultUnit(resourceName);
+      }
+      ResourceInformation resourceInformation = new ResourceInformation();
+      resourceInformation.setName(resourceName);
+      resourceInformation.setValue(value);
+      resourceInformation.setUnits(unit);
+      result.add(resourceInformation);
+    }
+    return result;
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org