You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2019/03/21 00:47:02 UTC

[hadoop] 06/06: YARN-9271. Backport YARN-6927 for resource type support in MapReduce

This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch YARN-8200
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit dcb7d7a451bc7d4f589fba5853746a0265bc8908
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Wed Mar 20 17:46:35 2019 -0700

    YARN-9271. Backport YARN-6927 for resource type support in MapReduce
---
 .../mapreduce/v2/app/job/impl/TaskAttemptImpl.java | 141 +++++++-
 .../mapreduce/TestMapreduceConfigFields.java       |  11 +
 .../mapreduce/v2/app/job/impl/TestTaskAttempt.java | 365 ++++++++++++++++++++-
 .../org/apache/hadoop/mapreduce/MRJobConfig.java   |  68 +++-
 .../java/org/apache/hadoop/mapred/YARNRunner.java  |  86 ++++-
 .../org/apache/hadoop/mapred/TestYARNRunner.java   | 167 ++++++++++
 .../hadoop/yarn/util/resource/ResourceUtils.java   |  44 +++
 7 files changed, 853 insertions(+), 29 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index dfc3adb..3f37d4d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
+import static org.apache.commons.lang.StringUtils.isEmpty;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -126,6 +128,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -139,6 +142,8 @@ import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -664,12 +669,8 @@ public abstract class TaskAttemptImpl implements
     this.jobFile = jobFile;
     this.partition = partition;
 
-    //TODO:create the resource reqt for this Task attempt
     this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
-    this.resourceCapability.setMemorySize(
-        getMemoryRequired(conf, taskId.getTaskType()));
-    this.resourceCapability.setVirtualCores(
-        getCpuRequired(conf, taskId.getTaskType()));
+    populateResourceCapability(taskId.getTaskType());
 
     this.dataLocalHosts = resolveHosts(dataLocalHosts);
     RackResolver.init(conf);
@@ -701,21 +702,133 @@ public abstract class TaskAttemptImpl implements
     return memory;
   }
 
+  private void populateResourceCapability(TaskType taskType) {
+    String resourceTypePrefix =
+        getResourceTypePrefix(taskType);
+    boolean memorySet = false;
+    boolean cpuVcoresSet = false;
+    if (resourceTypePrefix != null) {
+      List<ResourceInformation> resourceRequests =
+          ResourceUtils.getRequestedResourcesFromConfig(conf,
+              resourceTypePrefix);
+      for (ResourceInformation resourceRequest : resourceRequests) {
+        String resourceName = resourceRequest.getName();
+        if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
+            MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
+                resourceName)) {
+          if (memorySet) {
+            throw new IllegalArgumentException(
+                "Only one of the following keys " +
+                    "can be specified for a single job: " +
+                    MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
+                    MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
+          }
+          String units = isEmpty(resourceRequest.getUnits()) ?
+              ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
+                resourceRequest.getUnits();
+          this.resourceCapability.setMemorySize(
+              UnitsConversionUtil.convert(units, "Mi",
+                  resourceRequest.getValue()));
+          memorySet = true;
+          String memoryKey = getMemoryKey(taskType);
+          if (memoryKey != null && conf.get(memoryKey) != null) {
+            LOG.warn("Configuration " + resourceTypePrefix + resourceName +
+                "=" + resourceRequest.getValue() + resourceRequest.getUnits() +
+                " is overriding the " + memoryKey + "=" + conf.get(memoryKey) +
+                " configuration");
+          }
+        } else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(
+            resourceName)) {
+          this.resourceCapability.setVirtualCores(
+              (int) UnitsConversionUtil.convert(resourceRequest.getUnits(), "",
+                  resourceRequest.getValue()));
+          cpuVcoresSet = true;
+          String cpuKey = getCpuVcoresKey(taskType);
+          if (cpuKey != null && conf.get(cpuKey) != null) {
+            LOG.warn("Configuration " + resourceTypePrefix +
+                MRJobConfig.RESOURCE_TYPE_NAME_VCORE + "=" +
+                resourceRequest.getValue() + resourceRequest.getUnits() +
+                " is overriding the " + cpuKey + "=" +
+                conf.get(cpuKey) + " configuration");
+          }
+        } else {
+          ResourceInformation resourceInformation =
+              this.resourceCapability.getResourceInformation(resourceName);
+          resourceInformation.setUnits(resourceRequest.getUnits());
+          resourceInformation.setValue(resourceRequest.getValue());
+          this.resourceCapability.setResourceInformation(resourceName,
+              resourceInformation);
+        }
+      }
+    }
+    if (!memorySet) {
+      this.resourceCapability.setMemorySize(getMemoryRequired(conf, taskType));
+    }
+    if (!cpuVcoresSet) {
+      this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType));
+    }
+  }
+
+  private String getCpuVcoresKey(TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return MRJobConfig.MAP_CPU_VCORES;
+    case REDUCE:
+      return MRJobConfig.REDUCE_CPU_VCORES;
+    default:
+      return null;
+    }
+  }
+
+  private String getMemoryKey(TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return MRJobConfig.MAP_MEMORY_MB;
+    case REDUCE:
+      return MRJobConfig.REDUCE_MEMORY_MB;
+    default:
+      return null;
+    }
+  }
+
+  private Integer getCpuVcoreDefault(TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return MRJobConfig.DEFAULT_MAP_CPU_VCORES;
+    case REDUCE:
+      return MRJobConfig.DEFAULT_REDUCE_CPU_VCORES;
+    default:
+      return null;
+    }
+  }
+
   private int getCpuRequired(Configuration conf, TaskType taskType) {
     int vcores = 1;
-    if (taskType == TaskType.MAP)  {
-      vcores =
-          conf.getInt(MRJobConfig.MAP_CPU_VCORES,
-              MRJobConfig.DEFAULT_MAP_CPU_VCORES);
-    } else if (taskType == TaskType.REDUCE) {
-      vcores =
-          conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
-              MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+    String cpuVcoreKey = getCpuVcoresKey(taskType);
+    if (cpuVcoreKey != null) {
+      Integer defaultCpuVcores = getCpuVcoreDefault(taskType);
+      if (null == defaultCpuVcores) {
+        defaultCpuVcores = vcores;
+      }
+      vcores = conf.getInt(cpuVcoreKey, defaultCpuVcores);
     }
-    
     return vcores;
   }
 
+  private String getResourceTypePrefix(TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return MRJobConfig.MAP_RESOURCE_TYPE_PREFIX;
+    case REDUCE:
+      return MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX;
+    default:
+      LOG.info("TaskType " + taskType +
+          " does not support custom resource types - this support can be " +
+          "added in " + getClass().getSimpleName());
+      return null;
+    }
+  }
+
   /**
    * Create a {@link LocalResource} record with all the given parameters.
    * The NM that hosts AM container will upload resources to shared cache.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
index 096cec9..f469aad 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
@@ -78,6 +78,17 @@ public class TestMapreduceConfigFields extends TestConfigurationFieldsBase {
     xmlPropsToSkipCompare.add("mapreduce.local.clientfactory.class.name");
     xmlPropsToSkipCompare.add("mapreduce.jobtracker.system.dir");
     xmlPropsToSkipCompare.add("mapreduce.jobtracker.staging.root.dir");
+
+    // Resource type related properties are only prefixes,
+    // they need to be postfixed with the resource name
+    // in order to take effect.
+    // There is nothing to be added to mapred-default.xml
+    configurationPropsToSkipCompare.add(
+        MRJobConfig.MR_AM_RESOURCE_PREFIX);
+    configurationPropsToSkipCompare.add(
+        MRJobConfig.MAP_RESOURCE_TYPE_PREFIX);
+    configurationPropsToSkipCompare.add(
+        MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX);
   }
 
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index 60a2177..e055798 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -28,14 +28,21 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import com.google.common.base.Supplier;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +50,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
+import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -83,24 +91,36 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import com.google.common.collect.ImmutableList;
+
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class TestTaskAttempt{
-	
+
+  private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource";
+
   static public class StubbedFS extends RawLocalFileSystem {
     @Override
     public FileStatus getFileStatus(Path f) throws IOException {
@@ -108,6 +128,63 @@ public class TestTaskAttempt{
     }
   }
 
+  private static class CustomResourceTypesConfigurationProvider
+      extends LocalConfigurationProvider {
+
+    @Override
+    public InputStream getConfigurationInputStream(Configuration bootstrapConf,
+        String name) throws YarnException, IOException {
+      if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
+        return new ByteArrayInputStream(
+            ("<configuration>\n" +
+            " <property>\n" +
+            "   <name>yarn.resource-types</name>\n" +
+            "   <value>a-custom-resource</value>\n" +
+            " </property>\n" +
+            " <property>\n" +
+            "   <name>yarn.resource-types.a-custom-resource.units</name>\n" +
+            "   <value>G</value>\n" +
+            " </property>\n" +
+            "</configuration>\n").getBytes());
+      } else {
+        return super.getConfigurationInputStream(bootstrapConf, name);
+      }
+    }
+  }
+
+  private static class TestAppender extends AppenderSkeleton {
+
+    private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
+
+    @Override
+    public boolean requiresLayout() {
+      return false;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    protected void append(LoggingEvent arg0) {
+      logEvents.add(arg0);
+    }
+
+    private List<LoggingEvent> getLogEvents() {
+      return logEvents;
+    }
+  }
+
+  @BeforeClass
+  public static void setupBeforeClass() {
+    ResourceUtils.resetResourceTypes(new Configuration());
+  }
+
+  @After
+  public void tearDown() {
+    ResourceUtils.resetResourceTypes(new Configuration());
+  }
+
   @Test
   public void testMRAppHistoryForMap() throws Exception {
     MRApp app = new FailingAttemptsMRApp(1, 0);
@@ -329,17 +406,18 @@ public class TestTaskAttempt{
   private TaskAttemptImpl createMapTaskAttemptImplForTest(
       EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
     Clock clock = SystemClock.getInstance();
-    return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
+    return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo,
+        clock, new JobConf());
   }
 
   private TaskAttemptImpl createMapTaskAttemptImplForTest(
-      EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
+      EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo,
+      Clock clock, JobConf jobConf) {
     ApplicationId appId = ApplicationId.newInstance(1, 1);
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
     TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
     Path jobFile = mock(Path.class);
-    JobConf jobConf = new JobConf();
     TaskAttemptImpl taImpl =
         new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
             taskSplitMetaInfo, jobConf, taListener, null,
@@ -347,6 +425,20 @@ public class TestTaskAttempt{
     return taImpl;
   }
 
+  private TaskAttemptImpl createReduceTaskAttemptImplForTest(
+      EventHandler eventHandler, Clock clock, JobConf jobConf) {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    Path jobFile = mock(Path.class);
+    TaskAttemptImpl taImpl =
+        new ReduceTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+            1, jobConf, taListener, null,
+            null, clock, null);
+    return taImpl;
+  }
+
   private void testMRAppHistory(MRApp app) throws Exception {
     Configuration conf = new Configuration();
     Job job = app.submit(conf);
@@ -1423,6 +1515,271 @@ public class TestTaskAttempt{
     assertFalse("InternalError occurred", eventHandler.internalError);
   }
 
+  @Test
+  public void testMapperCustomResourceTypes() {
+    initResourceTypes();
+    EventHandler eventHandler = mock(EventHandler.class);
+    TaskSplitMetaInfo taskSplitMetaInfo = new TaskSplitMetaInfo();
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.setLong(MRJobConfig.MAP_RESOURCE_TYPE_PREFIX
+        + CUSTOM_RESOURCE_NAME, 7L);
+    TaskAttemptImpl taImpl = createMapTaskAttemptImplForTest(eventHandler,
+        taskSplitMetaInfo, clock, jobConf);
+    ResourceInformation resourceInfo =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getResourceInformation(CUSTOM_RESOURCE_NAME);
+    assertEquals("Expecting the default unit (G)",
+        "G", resourceInfo.getUnits());
+    assertEquals(7L, resourceInfo.getValue());
+  }
+
+  @Test
+  public void testReducerCustomResourceTypes() {
+    initResourceTypes();
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
+        + CUSTOM_RESOURCE_NAME, "3m");
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+    ResourceInformation resourceInfo =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getResourceInformation(CUSTOM_RESOURCE_NAME);
+    assertEquals("Expecting the specified unit (m)",
+        "m", resourceInfo.getUnits());
+    assertEquals(3L, resourceInfo.getValue());
+  }
+
+  @Test
+  public void testReducerMemoryRequestViaMapreduceReduceMemoryMb() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+    long memorySize =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getMemorySize();
+    assertEquals(2048, memorySize);
+  }
+
+  @Test
+  public void testReducerMemoryRequestViaMapreduceReduceResourceMemory() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, "2 Gi");
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+    long memorySize =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getMemorySize();
+    assertEquals(2048, memorySize);
+  }
+
+  @Test
+  public void testReducerMemoryRequestDefaultMemory() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
+    long memorySize =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getMemorySize();
+    assertEquals(MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, memorySize);
+  }
+
+  @Test
+  public void testReducerMemoryRequestWithoutUnits() {
+    Clock clock = SystemClock.getInstance();
+    for (String memoryResourceName : ImmutableList.of(
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+        MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+      EventHandler eventHandler = mock(EventHandler.class);
+      JobConf jobConf = new JobConf();
+      jobConf.setInt(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+          memoryResourceName, 2048);
+      TaskAttemptImpl taImpl =
+          createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+      long memorySize =
+          getResourceInfoFromContainerRequest(taImpl, eventHandler).
+          getMemorySize();
+      assertEquals(2048, memorySize);
+    }
+  }
+
+  @Test
+  public void testReducerMemoryRequestOverriding() {
+    for (String memoryName : ImmutableList.of(
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+        MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+      TestAppender testAppender = new TestAppender();
+      final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
+      try {
+        logger.addAppender(testAppender);
+        EventHandler eventHandler = mock(EventHandler.class);
+        Clock clock = SystemClock.getInstance();
+        JobConf jobConf = new JobConf();
+        jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
+            "3Gi");
+        jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
+        TaskAttemptImpl taImpl =
+            createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+        long memorySize =
+            getResourceInfoFromContainerRequest(taImpl, eventHandler).
+            getMemorySize();
+        assertEquals(3072, memorySize);
+        boolean foundLogWarning = false;
+        for (LoggingEvent e : testAppender.getLogEvents()) {
+          if (e.getLevel() == Level.WARN && ("Configuration " +
+                "mapreduce.reduce.resource." + memoryName + "=3Gi is " +
+                "overriding the mapreduce.reduce.memory.mb=2048 configuration")
+          .equals(e.getMessage())) {
+            foundLogWarning = true;
+            break;
+          }
+        }
+        assertTrue(foundLogWarning);
+      } finally {
+        logger.removeAppender(testAppender);
+      }
+    }
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testReducerMemoryRequestMultipleName() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    for (String memoryName : ImmutableList.of(
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+        MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+      jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
+          "3Gi");
+    }
+    createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+  }
+
+  @Test
+  public void testReducerCpuRequestViaMapreduceReduceCpuVcores() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 3);
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+    int vCores =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getVirtualCores();
+    assertEquals(3, vCores);
+  }
+
+  @Test
+  public void testReducerCpuRequestViaMapreduceReduceResourceVcores() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+        MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "5");
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+    int vCores =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getVirtualCores();
+    assertEquals(5, vCores);
+  }
+
+  @Test
+  public void testReducerCpuRequestDefaultMemory() {
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    TaskAttemptImpl taImpl =
+        createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
+    int vCores =
+        getResourceInfoFromContainerRequest(taImpl, eventHandler).
+        getVirtualCores();
+    assertEquals(MRJobConfig.DEFAULT_REDUCE_CPU_VCORES, vCores);
+  }
+
+  @Test
+  public void testReducerCpuRequestOverriding() {
+    TestAppender testAppender = new TestAppender();
+    final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
+    try {
+      logger.addAppender(testAppender);
+      EventHandler eventHandler = mock(EventHandler.class);
+      Clock clock = SystemClock.getInstance();
+      JobConf jobConf = new JobConf();
+      jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+          MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "7");
+      jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 9);
+      TaskAttemptImpl taImpl =
+          createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+      long vCores =
+          getResourceInfoFromContainerRequest(taImpl, eventHandler).
+          getVirtualCores();
+      assertEquals(7, vCores);
+      boolean foundLogWarning = false;
+      for (LoggingEvent e : testAppender.getLogEvents()) {
+        if (e.getLevel() == Level.WARN && ("Configuration " +
+              "mapreduce.reduce.resource.vcores=7 is overriding the " +
+              "mapreduce.reduce.cpu.vcores=9 configuration"
+        ).equals(e.getMessage())) {
+          foundLogWarning = true;
+          break;
+        }
+      }
+      assertTrue(foundLogWarning);
+    } finally {
+      logger.removeAppender(testAppender);
+    }
+  }
+
+  private Resource getResourceInfoFromContainerRequest(
+      TaskAttemptImpl taImpl, EventHandler eventHandler) {
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_SCHEDULE));
+
+    assertEquals("Task attempt is not in STARTING state", taImpl.getState(),
+        TaskAttemptState.STARTING);
+
+    ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(captor.capture());
+
+    List<ContainerRequestEvent> containerRequestEvents = new ArrayList<>();
+    for (Event e : captor.getAllValues()) {
+      if (e instanceof ContainerRequestEvent) {
+        containerRequestEvents.add((ContainerRequestEvent) e);
+      }
+    }
+    assertEquals("Expected one ContainerRequestEvent after scheduling "
+        + "task attempt", 1, containerRequestEvents.size());
+
+    return containerRequestEvents.get(0).getCapability();
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testReducerCustomResourceTypeWithInvalidUnit() {
+    initResourceTypes();
+    EventHandler eventHandler = mock(EventHandler.class);
+    Clock clock = SystemClock.getInstance();
+    JobConf jobConf = new JobConf();
+    jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
+        + CUSTOM_RESOURCE_NAME, "3z");
+    createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+  }
+
+  private void initResourceTypes() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+        CustomResourceTypesConfigurationProvider.class.getName());
+    ResourceUtils.resetResourceTypes(conf);
+  }
+
   private void setupTaskAttemptFinishingMonitor(
       EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
     TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index d666123..5a72def 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -360,12 +360,47 @@ public interface MRJobConfig {
 
   public static final String MAP_INPUT_START = "mapreduce.map.input.start";
 
+  /**
+   * Configuration key for specifying memory requirement for the mapper.
+   * Kept for backward-compatibility, mapreduce.map.resource.memory
+   * is the new preferred way to specify this.
+   */
   public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
   public static final int DEFAULT_MAP_MEMORY_MB = 1024;
 
+  /**
+   * Configuration key for specifying CPU requirement for the mapper.
+   * Kept for backward-compatibility, mapreduce.map.resource.vcores
+   * is the new preferred way to specify this.
+   */
   public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
   public static final int DEFAULT_MAP_CPU_VCORES = 1;
 
+  /**
+   * Custom resource names required by the mapper should be
+   * appended to this prefix, the value's format is {amount}[ ][{unit}].
+   * If no unit is defined, the default unit will be used.
+   * Standard resource names: memory (default unit: Mi), vcores
+   */
+  public static final String MAP_RESOURCE_TYPE_PREFIX =
+      "mapreduce.map.resource.";
+
+  /**
+   * Resource type name for CPU vcores.
+   */
+  public static final String RESOURCE_TYPE_NAME_VCORE = "vcores";
+
+  /**
+   * Resource type name for memory.
+   */
+  public static final String RESOURCE_TYPE_NAME_MEMORY = "memory";
+
+  /**
+   * Alternative resource type name for memory.
+   */
+  public static final String RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY =
+      "memory-mb";
+
   public static final String MAP_ENV = "mapreduce.map.env";
 
   public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
@@ -408,12 +443,31 @@ public interface MRJobConfig {
 
   public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
 
+  /**
+   * Configuration key for specifying memory requirement for the reducer.
+   * Kept for backward-compatibility, mapreduce.reduce.resource.memory
+   * is the new preferred way to specify this.
+   */
   public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
   public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
 
+  /**
+   * Configuration key for specifying CPU requirement for the reducer.
+   * Kept for backward-compatibility, mapreduce.reduce.resource.vcores
+   * is the new preferred way to specify this.
+   */
   public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
   public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
 
+  /**
+   * Resource names required by the reducer should be
+   * appended to this prefix, the value's format is {amount}[ ][{unit}].
+   * If no unit is defined, the default unit will be used.
+   * Standard resource names: memory (default unit: Mi), vcores
+   */
+  public static final String REDUCE_RESOURCE_TYPE_PREFIX =
+      "mapreduce.reduce.resource.";
+
   public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
 
   public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
@@ -599,7 +653,10 @@ public interface MRJobConfig {
   public static final String DEFAULT_MR_AM_STAGING_DIR = 
     "/tmp/hadoop-yarn/staging";
 
-  /** The amount of memory the MR app master needs.*/
+  /** The amount of memory the MR app master needs.
+   * Kept for backward-compatibility, yarn.app.mapreduce.am.resource.memory is
+   * the new preferred way to specify this
+   */
   public static final String MR_AM_VMEM_MB =
     MR_AM_PREFIX+"resource.mb";
   public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
@@ -609,6 +666,15 @@ public interface MRJobConfig {
     MR_AM_PREFIX+"resource.cpu-vcores";
   public static final int DEFAULT_MR_AM_CPU_VCORES = 1;
 
+  /**
+   * Resource names required by the MR AM should be
+   * appended to this prefix, the value's format is {amount}[ ][{unit}].
+   * If no unit is defined, the default unit will be used
+   * Standard resource names: memory (default unit: Mi), vcores
+   */
+  public static final String MR_AM_RESOURCE_PREFIX =
+      MR_AM_PREFIX + "resource.";
+
   /** Command line arguments passed to the MR app master.*/
   public static final String MR_AM_COMMAND_OPTS =
     MR_AM_PREFIX+"command-opts";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index a23ff34..12a3079 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.apache.commons.lang.StringUtils.isEmpty;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MR_AM_RESOURCE_PREFIX;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -84,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -93,6 +97,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -659,16 +665,76 @@ public class YARNRunner implements ClientProtocol {
 
   private List<ResourceRequest> generateResourceRequests() throws IOException {
     Resource capability = recordFactory.newRecordInstance(Resource.class);
-    capability.setMemorySize(
-        conf.getInt(
-            MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
-        )
-    );
-    capability.setVirtualCores(
-        conf.getInt(
-            MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
-        )
-    );
+    boolean memorySet = false;
+    boolean cpuVcoresSet = false;
+    List<ResourceInformation> resourceRequests = ResourceUtils
+        .getRequestedResourcesFromConfig(conf, MR_AM_RESOURCE_PREFIX);
+    for (ResourceInformation resourceReq : resourceRequests) {
+      String resourceName = resourceReq.getName();
+      if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
+          MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
+              resourceName)) {
+        if (memorySet) {
+          throw new IllegalArgumentException(
+              "Only one of the following keys " +
+                  "can be specified for a single job: " +
+                  MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
+                  MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
+        }
+        String units = isEmpty(resourceReq.getUnits()) ?
+            ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
+              resourceReq.getUnits();
+        capability.setMemorySize(
+            UnitsConversionUtil.convert(units, "Mi", resourceReq.getValue()));
+        memorySet = true;
+        if (conf.get(MRJobConfig.MR_AM_VMEM_MB) != null) {
+          LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX +
+              resourceName + "=" + resourceReq.getValue() +
+              resourceReq.getUnits() + " is overriding the " +
+              MRJobConfig.MR_AM_VMEM_MB + "=" +
+              conf.get(MRJobConfig.MR_AM_VMEM_MB) + " configuration");
+        }
+      } else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(resourceName)) {
+        capability.setVirtualCores(
+            (int) UnitsConversionUtil.convert(resourceReq.getUnits(), "",
+                resourceReq.getValue()));
+        cpuVcoresSet = true;
+        if (conf.get(MRJobConfig.MR_AM_CPU_VCORES) != null) {
+          LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX +
+              resourceName + "=" + resourceReq.getValue() +
+              resourceReq.getUnits() + " is overriding the " +
+              MRJobConfig.MR_AM_CPU_VCORES + "=" +
+              conf.get(MRJobConfig.MR_AM_CPU_VCORES) + " configuration");
+        }
+      } else if (!MRJobConfig.MR_AM_VMEM_MB.equals(
+          MR_AM_RESOURCE_PREFIX + resourceName) &&
+          !MRJobConfig.MR_AM_CPU_VCORES.equals(
+              MR_AM_RESOURCE_PREFIX + resourceName)) {
+        // the "mb", "cpu-vcores" resource types are not processed here
+        // since the yarn.app.mapreduce.am.resource.mb,
+        // yarn.app.mapreduce.am.resource.cpu-vcores keys are used for
+        // backward-compatibility - which is handled after this loop
+        ResourceInformation resourceInformation = capability
+            .getResourceInformation(resourceName);
+        resourceInformation.setUnits(resourceReq.getUnits());
+        resourceInformation.setValue(resourceReq.getValue());
+        capability.setResourceInformation(resourceName, resourceInformation);
+      }
+    }
+    if (!memorySet) {
+      capability.setMemorySize(
+          conf.getInt(
+              MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
+          )
+      );
+    }
+    if (!cpuVcoresSet) {
+      capability.setVirtualCores(
+          conf.getInt(
+              MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+          )
+      );
+    }
     if (LOG.isDebugEnabled()) {
       LOG.debug("AppMaster capability = " + capability);
     }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index c79b08e..ecb396e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -32,10 +32,12 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -43,6 +45,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,6 +72,7 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -96,28 +100,37 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.log4j.Appender;
+import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Layout;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.SimpleLayout;
 import org.apache.log4j.WriterAppender;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * Test YarnRunner and make sure the client side plugin works
  * fine
@@ -131,6 +144,53 @@ public class TestYARNRunner {
       MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0,
           MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%"));
 
+  private static class CustomResourceTypesConfigurationProvider
+      extends LocalConfigurationProvider {
+
+    @Override
+    public InputStream getConfigurationInputStream(Configuration bootstrapConf,
+        String name) throws YarnException, IOException {
+      if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
+        return new ByteArrayInputStream(
+            ("<configuration>\n" +
+            " <property>\n" +
+            "   <name>yarn.resource-types</name>\n" +
+            "   <value>a-custom-resource</value>\n" +
+            " </property>\n" +
+            " <property>\n" +
+            "   <name>yarn.resource-types.a-custom-resource.units</name>\n" +
+            "   <value>G</value>\n" +
+            " </property>\n" +
+            "</configuration>\n").getBytes());
+      } else {
+        return super.getConfigurationInputStream(bootstrapConf, name);
+      }
+    }
+  }
+
+  private static class TestAppender extends AppenderSkeleton {
+
+    private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
+
+    @Override
+    public boolean requiresLayout() {
+      return false;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    protected void append(LoggingEvent arg0) {
+      logEvents.add(arg0);
+    }
+
+    private List<LoggingEvent> getLogEvents() {
+      return logEvents;
+    }
+  }
+
   private YARNRunner yarnRunner;
   private ResourceMgrDelegate resourceMgrDelegate;
   private YarnConfiguration conf;
@@ -143,6 +203,11 @@ public class TestYARNRunner {
   private  ClientServiceDelegate clientDelegate;
   private static final String failString = "Rejected job";
 
+  @BeforeClass
+  public static void setupBeforeClass() {
+    ResourceUtils.resetResourceTypes(new Configuration());
+  }
+
   @Before
   public void setUp() throws Exception {
     resourceMgrDelegate = mock(ResourceMgrDelegate.class);
@@ -175,6 +240,7 @@ public class TestYARNRunner {
   @After
   public void cleanup() {
     FileUtil.fullyDelete(testWorkDir);
+    ResourceUtils.resetResourceTypes(new Configuration());
   }
 
   @Test(timeout=20000)
@@ -884,4 +950,105 @@ public class TestYARNRunner {
         .get("hadoop.tmp.dir").equals("testconfdir"));
     UserGroupInformation.reset();
   }
+
+  @Test
+  public void testCustomAMRMResourceType() throws Exception {
+    initResourceTypes();
+    String customResourceName = "a-custom-resource";
+
+    JobConf jobConf = new JobConf();
+
+    jobConf.setInt(MRJobConfig.MR_AM_RESOURCE_PREFIX +
+        customResourceName, 5);
+    jobConf.setInt(MRJobConfig.MR_AM_CPU_VCORES, 3);
+
+    yarnRunner = new YARNRunner(jobConf);
+
+    submissionContext = buildSubmitContext(yarnRunner, jobConf);
+
+    List<ResourceRequest> resourceRequests =
+        submissionContext.getAMContainerResourceRequests();
+
+    Assert.assertEquals(1, resourceRequests.size());
+    ResourceRequest resourceRequest = resourceRequests.get(0);
+
+    ResourceInformation resourceInformation = resourceRequest.getCapability()
+        .getResourceInformation(customResourceName);
+    Assert.assertEquals("Expecting the default unit (G)",
+        "G", resourceInformation.getUnits());
+    Assert.assertEquals(5L, resourceInformation.getValue());
+    Assert.assertEquals(3, resourceRequest.getCapability().getVirtualCores());
+  }
+
+  @Test
+  public void testAMRMemoryRequest() throws Exception {
+    for (String memoryName : ImmutableList.of(
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+        MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+      JobConf jobConf = new JobConf();
+      jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
+
+      yarnRunner = new YARNRunner(jobConf);
+
+      submissionContext = buildSubmitContext(yarnRunner, jobConf);
+
+      List<ResourceRequest> resourceRequests =
+          submissionContext.getAMContainerResourceRequests();
+
+      Assert.assertEquals(1, resourceRequests.size());
+      ResourceRequest resourceRequest = resourceRequests.get(0);
+
+      long memorySize = resourceRequest.getCapability().getMemorySize();
+      Assert.assertEquals(3072, memorySize);
+    }
+  }
+
+  @Test
+  public void testAMRMemoryRequestOverriding() throws Exception {
+    for (String memoryName : ImmutableList.of(
+        MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+        MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+      TestAppender testAppender = new TestAppender();
+      Logger logger = Logger.getLogger(YARNRunner.class);
+      logger.addAppender(testAppender);
+      try {
+        JobConf jobConf = new JobConf();
+        jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
+        jobConf.setInt(MRJobConfig.MR_AM_VMEM_MB, 2048);
+
+        yarnRunner = new YARNRunner(jobConf);
+
+        submissionContext = buildSubmitContext(yarnRunner, jobConf);
+
+        List<ResourceRequest> resourceRequests =
+            submissionContext.getAMContainerResourceRequests();
+
+        Assert.assertEquals(1, resourceRequests.size());
+        ResourceRequest resourceRequest = resourceRequests.get(0);
+
+        long memorySize = resourceRequest.getCapability().getMemorySize();
+        Assert.assertEquals(3072, memorySize);
+        boolean foundLogWarning = false;
+        for (LoggingEvent e : testAppender.getLogEvents()) {
+          if (e.getLevel() == Level.WARN && ("Configuration " +
+                "yarn.app.mapreduce.am.resource." + memoryName + "=3Gi is " +
+                "overriding the yarn.app.mapreduce.am.resource.mb=2048 " +
+                "configuration").equals(e.getMessage())) {
+            foundLogWarning = true;
+            break;
+          }
+        }
+        assertTrue(foundLogWarning);
+      } finally {
+        logger.removeAppender(testAppender);
+      }
+    }
+  }
+
+  private void initResourceTypes() {
+    Configuration configuration = new Configuration();
+    configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+        CustomResourceTypesConfigurationProvider.class.getName());
+    ResourceUtils.resetResourceTypes(configuration);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
index 65eb5a2..3806771 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
@@ -44,7 +44,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
 
@@ -60,6 +63,8 @@ public class ResourceUtils {
 
   private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
   private static final String VCORES = ResourceInformation.VCORES.getName();
+  private static final Pattern RESOURCE_REQUEST_VALUE_PATTERN =
+      Pattern.compile("^([0-9]+) ?([a-zA-Z]*)$");
 
   private static volatile boolean initializedResources = false;
   private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX =
@@ -564,4 +569,43 @@ public class ResourceUtils {
     }
     return array;
   }
+
+  /**
+   * From a given configuration get all entries representing requested
+   * resources: entries that match the {prefix}{resourceName}={value}[{units}]
+   * pattern.
+   * @param configuration The configuration
+   * @param prefix Keys with this prefix are considered from the configuration
+   * @return The list of requested resources as described by the configuration
+   */
+  public static List<ResourceInformation> getRequestedResourcesFromConfig(
+      Configuration configuration, String prefix) {
+    List<ResourceInformation> result = new ArrayList<>();
+    Map<String, String> customResourcesMap = configuration
+        .getValByRegex("^" + Pattern.quote(prefix) + "[^.]+$");
+    for (Entry<String, String> resource : customResourcesMap.entrySet()) {
+      String resourceName = resource.getKey().substring(prefix.length());
+      Matcher matcher =
+          RESOURCE_REQUEST_VALUE_PATTERN.matcher(resource.getValue());
+      if (!matcher.matches()) {
+        String errorMsg = "Invalid resource request specified for property "
+            + resource.getKey() + ": \"" + resource.getValue()
+            + "\", expected format is: value[ ][units]";
+        LOG.error(errorMsg);
+        throw new IllegalArgumentException(errorMsg);
+      }
+      long value = Long.parseLong(matcher.group(1));
+      String unit = matcher.group(2);
+      if (unit.isEmpty()) {
+        unit = ResourceUtils.getDefaultUnit(resourceName);
+      }
+      ResourceInformation resourceInformation = new ResourceInformation();
+      resourceInformation.setName(resourceName);
+      resourceInformation.setValue(value);
+      resourceInformation.setUnits(unit);
+      result.add(resourceInformation);
+    }
+    return result;
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org