You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2019/03/21 00:47:02 UTC
[hadoop] 06/06: YARN-9271. Backport YARN-6927 for resource type
support in MapReduce
This is an automated email from the ASF dual-hosted git repository.
jhung pushed a commit to branch YARN-8200
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit dcb7d7a451bc7d4f589fba5853746a0265bc8908
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Wed Mar 20 17:46:35 2019 -0700
YARN-9271. Backport YARN-6927 for resource type support in MapReduce
---
.../mapreduce/v2/app/job/impl/TaskAttemptImpl.java | 141 +++++++-
.../mapreduce/TestMapreduceConfigFields.java | 11 +
.../mapreduce/v2/app/job/impl/TestTaskAttempt.java | 365 ++++++++++++++++++++-
.../org/apache/hadoop/mapreduce/MRJobConfig.java | 68 +++-
.../java/org/apache/hadoop/mapred/YARNRunner.java | 86 ++++-
.../org/apache/hadoop/mapred/TestYARNRunner.java | 167 ++++++++++
.../hadoop/yarn/util/resource/ResourceUtils.java | 44 +++
7 files changed, 853 insertions(+), 29 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index dfc3adb..3f37d4d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
+import static org.apache.commons.lang.StringUtils.isEmpty;
+
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -126,6 +128,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -139,6 +142,8 @@ import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -664,12 +669,8 @@ public abstract class TaskAttemptImpl implements
this.jobFile = jobFile;
this.partition = partition;
- //TODO:create the resource reqt for this Task attempt
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
- this.resourceCapability.setMemorySize(
- getMemoryRequired(conf, taskId.getTaskType()));
- this.resourceCapability.setVirtualCores(
- getCpuRequired(conf, taskId.getTaskType()));
+ populateResourceCapability(taskId.getTaskType());
this.dataLocalHosts = resolveHosts(dataLocalHosts);
RackResolver.init(conf);
@@ -701,21 +702,133 @@ public abstract class TaskAttemptImpl implements
return memory;
}
+ private void populateResourceCapability(TaskType taskType) {
+ String resourceTypePrefix =
+ getResourceTypePrefix(taskType);
+ boolean memorySet = false;
+ boolean cpuVcoresSet = false;
+ if (resourceTypePrefix != null) {
+ List<ResourceInformation> resourceRequests =
+ ResourceUtils.getRequestedResourcesFromConfig(conf,
+ resourceTypePrefix);
+ for (ResourceInformation resourceRequest : resourceRequests) {
+ String resourceName = resourceRequest.getName();
+ if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
+ resourceName)) {
+ if (memorySet) {
+ throw new IllegalArgumentException(
+ "Only one of the following keys " +
+ "can be specified for a single job: " +
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
+ }
+ String units = isEmpty(resourceRequest.getUnits()) ?
+ ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
+ resourceRequest.getUnits();
+ this.resourceCapability.setMemorySize(
+ UnitsConversionUtil.convert(units, "Mi",
+ resourceRequest.getValue()));
+ memorySet = true;
+ String memoryKey = getMemoryKey(taskType);
+ if (memoryKey != null && conf.get(memoryKey) != null) {
+ LOG.warn("Configuration " + resourceTypePrefix + resourceName +
+ "=" + resourceRequest.getValue() + resourceRequest.getUnits() +
+ " is overriding the " + memoryKey + "=" + conf.get(memoryKey) +
+ " configuration");
+ }
+ } else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(
+ resourceName)) {
+ this.resourceCapability.setVirtualCores(
+ (int) UnitsConversionUtil.convert(resourceRequest.getUnits(), "",
+ resourceRequest.getValue()));
+ cpuVcoresSet = true;
+ String cpuKey = getCpuVcoresKey(taskType);
+ if (cpuKey != null && conf.get(cpuKey) != null) {
+ LOG.warn("Configuration " + resourceTypePrefix +
+ MRJobConfig.RESOURCE_TYPE_NAME_VCORE + "=" +
+ resourceRequest.getValue() + resourceRequest.getUnits() +
+ " is overriding the " + cpuKey + "=" +
+ conf.get(cpuKey) + " configuration");
+ }
+ } else {
+ ResourceInformation resourceInformation =
+ this.resourceCapability.getResourceInformation(resourceName);
+ resourceInformation.setUnits(resourceRequest.getUnits());
+ resourceInformation.setValue(resourceRequest.getValue());
+ this.resourceCapability.setResourceInformation(resourceName,
+ resourceInformation);
+ }
+ }
+ }
+ if (!memorySet) {
+ this.resourceCapability.setMemorySize(getMemoryRequired(conf, taskType));
+ }
+ if (!cpuVcoresSet) {
+ this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType));
+ }
+ }
+
+ private String getCpuVcoresKey(TaskType taskType) {
+ switch (taskType) {
+ case MAP:
+ return MRJobConfig.MAP_CPU_VCORES;
+ case REDUCE:
+ return MRJobConfig.REDUCE_CPU_VCORES;
+ default:
+ return null;
+ }
+ }
+
+ private String getMemoryKey(TaskType taskType) {
+ switch (taskType) {
+ case MAP:
+ return MRJobConfig.MAP_MEMORY_MB;
+ case REDUCE:
+ return MRJobConfig.REDUCE_MEMORY_MB;
+ default:
+ return null;
+ }
+ }
+
+ private Integer getCpuVcoreDefault(TaskType taskType) {
+ switch (taskType) {
+ case MAP:
+ return MRJobConfig.DEFAULT_MAP_CPU_VCORES;
+ case REDUCE:
+ return MRJobConfig.DEFAULT_REDUCE_CPU_VCORES;
+ default:
+ return null;
+ }
+ }
+
private int getCpuRequired(Configuration conf, TaskType taskType) {
int vcores = 1;
- if (taskType == TaskType.MAP) {
- vcores =
- conf.getInt(MRJobConfig.MAP_CPU_VCORES,
- MRJobConfig.DEFAULT_MAP_CPU_VCORES);
- } else if (taskType == TaskType.REDUCE) {
- vcores =
- conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
- MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+ String cpuVcoreKey = getCpuVcoresKey(taskType);
+ if (cpuVcoreKey != null) {
+ Integer defaultCpuVcores = getCpuVcoreDefault(taskType);
+ if (null == defaultCpuVcores) {
+ defaultCpuVcores = vcores;
+ }
+ vcores = conf.getInt(cpuVcoreKey, defaultCpuVcores);
}
-
return vcores;
}
+ private String getResourceTypePrefix(TaskType taskType) {
+ switch (taskType) {
+ case MAP:
+ return MRJobConfig.MAP_RESOURCE_TYPE_PREFIX;
+ case REDUCE:
+ return MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX;
+ default:
+ LOG.info("TaskType " + taskType +
+ " does not support custom resource types - this support can be " +
+ "added in " + getClass().getSimpleName());
+ return null;
+ }
+ }
+
/**
* Create a {@link LocalResource} record with all the given parameters.
* The NM that hosts AM container will upload resources to shared cache.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
index 096cec9..f469aad 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
@@ -78,6 +78,17 @@ public class TestMapreduceConfigFields extends TestConfigurationFieldsBase {
xmlPropsToSkipCompare.add("mapreduce.local.clientfactory.class.name");
xmlPropsToSkipCompare.add("mapreduce.jobtracker.system.dir");
xmlPropsToSkipCompare.add("mapreduce.jobtracker.staging.root.dir");
+
+ // Resource type related properties are only prefixes,
+ // they need to be postfixed with the resource name
+ // in order to take effect.
+ // There is nothing to be added to mapred-default.xml
+ configurationPropsToSkipCompare.add(
+ MRJobConfig.MR_AM_RESOURCE_PREFIX);
+ configurationPropsToSkipCompare.add(
+ MRJobConfig.MAP_RESOURCE_TYPE_PREFIX);
+ configurationPropsToSkipCompare.add(
+ MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX);
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index 60a2177..e055798 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -28,14 +28,21 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import com.google.common.base.Supplier;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +50,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
+import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -83,24 +91,36 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import com.google.common.collect.ImmutableList;
+
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestTaskAttempt{
-
+
+ private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource";
+
static public class StubbedFS extends RawLocalFileSystem {
@Override
public FileStatus getFileStatus(Path f) throws IOException {
@@ -108,6 +128,63 @@ public class TestTaskAttempt{
}
}
+ private static class CustomResourceTypesConfigurationProvider
+ extends LocalConfigurationProvider {
+
+ @Override
+ public InputStream getConfigurationInputStream(Configuration bootstrapConf,
+ String name) throws YarnException, IOException {
+ if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
+ return new ByteArrayInputStream(
+ ("<configuration>\n" +
+ " <property>\n" +
+ " <name>yarn.resource-types</name>\n" +
+ " <value>a-custom-resource</value>\n" +
+ " </property>\n" +
+ " <property>\n" +
+ " <name>yarn.resource-types.a-custom-resource.units</name>\n" +
+ " <value>G</value>\n" +
+ " </property>\n" +
+ "</configuration>\n").getBytes());
+ } else {
+ return super.getConfigurationInputStream(bootstrapConf, name);
+ }
+ }
+ }
+
+ private static class TestAppender extends AppenderSkeleton {
+
+ private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ protected void append(LoggingEvent arg0) {
+ logEvents.add(arg0);
+ }
+
+ private List<LoggingEvent> getLogEvents() {
+ return logEvents;
+ }
+ }
+
+ @BeforeClass
+ public static void setupBeforeClass() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ }
+
+ @After
+ public void tearDown() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ }
+
@Test
public void testMRAppHistoryForMap() throws Exception {
MRApp app = new FailingAttemptsMRApp(1, 0);
@@ -329,17 +406,18 @@ public class TestTaskAttempt{
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
Clock clock = SystemClock.getInstance();
- return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
+ return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo,
+ clock, new JobConf());
}
private TaskAttemptImpl createMapTaskAttemptImplForTest(
- EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
+ EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo,
+ Clock clock, JobConf jobConf) {
ApplicationId appId = ApplicationId.newInstance(1, 1);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
Path jobFile = mock(Path.class);
- JobConf jobConf = new JobConf();
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
taskSplitMetaInfo, jobConf, taListener, null,
@@ -347,6 +425,20 @@ public class TestTaskAttempt{
return taImpl;
}
+ private TaskAttemptImpl createReduceTaskAttemptImplForTest(
+ EventHandler eventHandler, Clock clock, JobConf jobConf) {
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ Path jobFile = mock(Path.class);
+ TaskAttemptImpl taImpl =
+ new ReduceTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ 1, jobConf, taListener, null,
+ null, clock, null);
+ return taImpl;
+ }
+
private void testMRAppHistory(MRApp app) throws Exception {
Configuration conf = new Configuration();
Job job = app.submit(conf);
@@ -1423,6 +1515,271 @@ public class TestTaskAttempt{
assertFalse("InternalError occurred", eventHandler.internalError);
}
+ @Test
+ public void testMapperCustomResourceTypes() {
+ initResourceTypes();
+ EventHandler eventHandler = mock(EventHandler.class);
+ TaskSplitMetaInfo taskSplitMetaInfo = new TaskSplitMetaInfo();
+ Clock clock = SystemClock.getInstance();
+ JobConf jobConf = new JobConf();
+ jobConf.setLong(MRJobConfig.MAP_RESOURCE_TYPE_PREFIX
+ + CUSTOM_RESOURCE_NAME, 7L);
+ TaskAttemptImpl taImpl = createMapTaskAttemptImplForTest(eventHandler,
+ taskSplitMetaInfo, clock, jobConf);
+ ResourceInformation resourceInfo =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getResourceInformation(CUSTOM_RESOURCE_NAME);
+ assertEquals("Expecting the default unit (G)",
+ "G", resourceInfo.getUnits());
+ assertEquals(7L, resourceInfo.getValue());
+ }
+
+ @Test
+ public void testReducerCustomResourceTypes() {
+ initResourceTypes();
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ JobConf jobConf = new JobConf();
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
+ + CUSTOM_RESOURCE_NAME, "3m");
+ TaskAttemptImpl taImpl =
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+ ResourceInformation resourceInfo =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getResourceInformation(CUSTOM_RESOURCE_NAME);
+ assertEquals("Expecting the specified unit (m)",
+ "m", resourceInfo.getUnits());
+ assertEquals(3L, resourceInfo.getValue());
+ }
+
+ @Test
+ public void testReducerMemoryRequestViaMapreduceReduceMemoryMb() {
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ JobConf jobConf = new JobConf();
+ jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
+ TaskAttemptImpl taImpl =
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+ long memorySize =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getMemorySize();
+ assertEquals(2048, memorySize);
+ }
+
+ @Test
+ public void testReducerMemoryRequestViaMapreduceReduceResourceMemory() {
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ JobConf jobConf = new JobConf();
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, "2 Gi");
+ TaskAttemptImpl taImpl =
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+ long memorySize =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getMemorySize();
+ assertEquals(2048, memorySize);
+ }
+
+ @Test
+ public void testReducerMemoryRequestDefaultMemory() {
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ TaskAttemptImpl taImpl =
+ createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
+ long memorySize =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getMemorySize();
+ assertEquals(MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, memorySize);
+ }
+
+ @Test
+ public void testReducerMemoryRequestWithoutUnits() {
+ Clock clock = SystemClock.getInstance();
+ for (String memoryResourceName : ImmutableList.of(
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+ EventHandler eventHandler = mock(EventHandler.class);
+ JobConf jobConf = new JobConf();
+ jobConf.setInt(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+ memoryResourceName, 2048);
+ TaskAttemptImpl taImpl =
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+ long memorySize =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getMemorySize();
+ assertEquals(2048, memorySize);
+ }
+ }
+
+ @Test
+ public void testReducerMemoryRequestOverriding() {
+ for (String memoryName : ImmutableList.of(
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+ TestAppender testAppender = new TestAppender();
+ final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
+ try {
+ logger.addAppender(testAppender);
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ JobConf jobConf = new JobConf();
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
+ "3Gi");
+ jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
+ TaskAttemptImpl taImpl =
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+ long memorySize =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getMemorySize();
+ assertEquals(3072, memorySize);
+ boolean foundLogWarning = false;
+ for (LoggingEvent e : testAppender.getLogEvents()) {
+ if (e.getLevel() == Level.WARN && ("Configuration " +
+ "mapreduce.reduce.resource." + memoryName + "=3Gi is " +
+ "overriding the mapreduce.reduce.memory.mb=2048 configuration")
+ .equals(e.getMessage())) {
+ foundLogWarning = true;
+ break;
+ }
+ }
+ assertTrue(foundLogWarning);
+ } finally {
+ logger.removeAppender(testAppender);
+ }
+ }
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testReducerMemoryRequestMultipleName() {
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ JobConf jobConf = new JobConf();
+ for (String memoryName : ImmutableList.of(
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
+ "3Gi");
+ }
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+ }
+
+ @Test
+ public void testReducerCpuRequestViaMapreduceReduceCpuVcores() {
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ JobConf jobConf = new JobConf();
+ jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 3);
+ TaskAttemptImpl taImpl =
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+ int vCores =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getVirtualCores();
+ assertEquals(3, vCores);
+ }
+
+ @Test
+ public void testReducerCpuRequestViaMapreduceReduceResourceVcores() {
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ JobConf jobConf = new JobConf();
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+ MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "5");
+ TaskAttemptImpl taImpl =
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+ int vCores =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getVirtualCores();
+ assertEquals(5, vCores);
+ }
+
+ @Test
+ public void testReducerCpuRequestDefaultMemory() {
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ TaskAttemptImpl taImpl =
+ createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
+ int vCores =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getVirtualCores();
+ assertEquals(MRJobConfig.DEFAULT_REDUCE_CPU_VCORES, vCores);
+ }
+
+ @Test
+ public void testReducerCpuRequestOverriding() {
+ TestAppender testAppender = new TestAppender();
+ final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
+ try {
+ logger.addAppender(testAppender);
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ JobConf jobConf = new JobConf();
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
+ MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "7");
+ jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 9);
+ TaskAttemptImpl taImpl =
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+ long vCores =
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
+ getVirtualCores();
+ assertEquals(7, vCores);
+ boolean foundLogWarning = false;
+ for (LoggingEvent e : testAppender.getLogEvents()) {
+ if (e.getLevel() == Level.WARN && ("Configuration " +
+ "mapreduce.reduce.resource.vcores=7 is overriding the " +
+ "mapreduce.reduce.cpu.vcores=9 configuration"
+ ).equals(e.getMessage())) {
+ foundLogWarning = true;
+ break;
+ }
+ }
+ assertTrue(foundLogWarning);
+ } finally {
+ logger.removeAppender(testAppender);
+ }
+ }
+
+ private Resource getResourceInfoFromContainerRequest(
+ TaskAttemptImpl taImpl, EventHandler eventHandler) {
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_SCHEDULE));
+
+ assertEquals("Task attempt is not in STARTING state", taImpl.getState(),
+ TaskAttemptState.STARTING);
+
+ ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(2)).handle(captor.capture());
+
+ List<ContainerRequestEvent> containerRequestEvents = new ArrayList<>();
+ for (Event e : captor.getAllValues()) {
+ if (e instanceof ContainerRequestEvent) {
+ containerRequestEvents.add((ContainerRequestEvent) e);
+ }
+ }
+ assertEquals("Expected one ContainerRequestEvent after scheduling "
+ + "task attempt", 1, containerRequestEvents.size());
+
+ return containerRequestEvents.get(0).getCapability();
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testReducerCustomResourceTypeWithInvalidUnit() {
+ initResourceTypes();
+ EventHandler eventHandler = mock(EventHandler.class);
+ Clock clock = SystemClock.getInstance();
+ JobConf jobConf = new JobConf();
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
+ + CUSTOM_RESOURCE_NAME, "3z");
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
+ }
+
+ private void initResourceTypes() {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+ CustomResourceTypesConfigurationProvider.class.getName());
+ ResourceUtils.resetResourceTypes(conf);
+ }
+
private void setupTaskAttemptFinishingMonitor(
EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index d666123..5a72def 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -360,12 +360,47 @@ public interface MRJobConfig {
public static final String MAP_INPUT_START = "mapreduce.map.input.start";
+ /**
+ * Configuration key for specifying memory requirement for the mapper.
+ * Kept for backward-compatibility, mapreduce.map.resource.memory
+ * is the new preferred way to specify this.
+ */
public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
public static final int DEFAULT_MAP_MEMORY_MB = 1024;
+ /**
+ * Configuration key for specifying CPU requirement for the mapper.
+ * Kept for backward-compatibility, mapreduce.map.resource.vcores
+ * is the new preferred way to specify this.
+ */
public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
public static final int DEFAULT_MAP_CPU_VCORES = 1;
+ /**
+ * Custom resource names required by the mapper should be
+ * appended to this prefix, the value's format is {amount}[ ][{unit}].
+ * If no unit is defined, the default unit will be used.
+ * Standard resource names: memory (default unit: Mi), vcores
+ */
+ public static final String MAP_RESOURCE_TYPE_PREFIX =
+ "mapreduce.map.resource.";
+
+ /**
+ * Resource type name for CPU vcores.
+ */
+ public static final String RESOURCE_TYPE_NAME_VCORE = "vcores";
+
+ /**
+ * Resource type name for memory.
+ */
+ public static final String RESOURCE_TYPE_NAME_MEMORY = "memory";
+
+ /**
+ * Alternative resource type name for memory.
+ */
+ public static final String RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY =
+ "memory-mb";
+
public static final String MAP_ENV = "mapreduce.map.env";
public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
@@ -408,12 +443,31 @@ public interface MRJobConfig {
public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
+ /**
+ * Configuration key for specifying memory requirement for the reducer.
+ * Kept for backward-compatibility, mapreduce.reduce.resource.memory
+ * is the new preferred way to specify this.
+ */
public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
+ /**
+ * Configuration key for specifying CPU requirement for the reducer.
+ * Kept for backward-compatibility, mapreduce.reduce.resource.vcores
+ * is the new preferred way to specify this.
+ */
public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
+ /**
+ * Resource names required by the reducer should be
+ * appended to this prefix, the value's format is {amount}[ ][{unit}].
+ * If no unit is defined, the default unit will be used.
+ * Standard resource names: memory (default unit: Mi), vcores
+ */
+ public static final String REDUCE_RESOURCE_TYPE_PREFIX =
+ "mapreduce.reduce.resource.";
+
public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
@@ -599,7 +653,10 @@ public interface MRJobConfig {
public static final String DEFAULT_MR_AM_STAGING_DIR =
"/tmp/hadoop-yarn/staging";
- /** The amount of memory the MR app master needs.*/
+ /** The amount of memory the MR app master needs.
+ * Kept for backward-compatibility, yarn.app.mapreduce.am.resource.memory is
+ * the new preferred way to specify this
+ */
public static final String MR_AM_VMEM_MB =
MR_AM_PREFIX+"resource.mb";
public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
@@ -609,6 +666,15 @@ public interface MRJobConfig {
MR_AM_PREFIX+"resource.cpu-vcores";
public static final int DEFAULT_MR_AM_CPU_VCORES = 1;
+ /**
+ * Resource names required by the MR AM should be
+ * appended to this prefix, the value's format is {amount}[ ][{unit}].
+ * If no unit is defined, the default unit will be used
+ * Standard resource names: memory (default unit: Mi), vcores
+ */
+ public static final String MR_AM_RESOURCE_PREFIX =
+ MR_AM_PREFIX + "resource.";
+
/** Command line arguments passed to the MR app master.*/
public static final String MR_AM_COMMAND_OPTS =
MR_AM_PREFIX+"command-opts";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index a23ff34..12a3079 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.mapred;
+import static org.apache.commons.lang.StringUtils.isEmpty;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MR_AM_RESOURCE_PREFIX;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -84,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -93,6 +97,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -659,16 +665,76 @@ public class YARNRunner implements ClientProtocol {
private List<ResourceRequest> generateResourceRequests() throws IOException {
Resource capability = recordFactory.newRecordInstance(Resource.class);
- capability.setMemorySize(
- conf.getInt(
- MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
- )
- );
- capability.setVirtualCores(
- conf.getInt(
- MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
- )
- );
+ boolean memorySet = false;
+ boolean cpuVcoresSet = false;
+ List<ResourceInformation> resourceRequests = ResourceUtils
+ .getRequestedResourcesFromConfig(conf, MR_AM_RESOURCE_PREFIX);
+ for (ResourceInformation resourceReq : resourceRequests) {
+ String resourceName = resourceReq.getName();
+ if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
+ resourceName)) {
+ if (memorySet) {
+ throw new IllegalArgumentException(
+ "Only one of the following keys " +
+ "can be specified for a single job: " +
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
+ }
+ String units = isEmpty(resourceReq.getUnits()) ?
+ ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
+ resourceReq.getUnits();
+ capability.setMemorySize(
+ UnitsConversionUtil.convert(units, "Mi", resourceReq.getValue()));
+ memorySet = true;
+ if (conf.get(MRJobConfig.MR_AM_VMEM_MB) != null) {
+ LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX +
+ resourceName + "=" + resourceReq.getValue() +
+ resourceReq.getUnits() + " is overriding the " +
+ MRJobConfig.MR_AM_VMEM_MB + "=" +
+ conf.get(MRJobConfig.MR_AM_VMEM_MB) + " configuration");
+ }
+ } else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(resourceName)) {
+ capability.setVirtualCores(
+ (int) UnitsConversionUtil.convert(resourceReq.getUnits(), "",
+ resourceReq.getValue()));
+ cpuVcoresSet = true;
+ if (conf.get(MRJobConfig.MR_AM_CPU_VCORES) != null) {
+ LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX +
+ resourceName + "=" + resourceReq.getValue() +
+ resourceReq.getUnits() + " is overriding the " +
+ MRJobConfig.MR_AM_CPU_VCORES + "=" +
+ conf.get(MRJobConfig.MR_AM_CPU_VCORES) + " configuration");
+ }
+ } else if (!MRJobConfig.MR_AM_VMEM_MB.equals(
+ MR_AM_RESOURCE_PREFIX + resourceName) &&
+ !MRJobConfig.MR_AM_CPU_VCORES.equals(
+ MR_AM_RESOURCE_PREFIX + resourceName)) {
+ // the "mb", "cpu-vcores" resource types are not processed here
+ // since the yarn.app.mapreduce.am.resource.mb,
+ // yarn.app.mapreduce.am.resource.cpu-vcores keys are used for
+ // backward-compatibility - which is handled after this loop
+ ResourceInformation resourceInformation = capability
+ .getResourceInformation(resourceName);
+ resourceInformation.setUnits(resourceReq.getUnits());
+ resourceInformation.setValue(resourceReq.getValue());
+ capability.setResourceInformation(resourceName, resourceInformation);
+ }
+ }
+ if (!memorySet) {
+ capability.setMemorySize(
+ conf.getInt(
+ MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
+ )
+ );
+ }
+ if (!cpuVcoresSet) {
+ capability.setVirtualCores(
+ conf.getInt(
+ MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+ )
+ );
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("AppMaster capability = " + capability);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index c79b08e..ecb396e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -32,10 +32,12 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -43,6 +45,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -69,6 +72,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -96,28 +100,37 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.log4j.Appender;
+import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Layout;
+import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
+import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.google.common.collect.ImmutableList;
+
/**
* Test YarnRunner and make sure the client side plugin works
* fine
@@ -131,6 +144,53 @@ public class TestYARNRunner {
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0,
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%"));
+ private static class CustomResourceTypesConfigurationProvider
+ extends LocalConfigurationProvider {
+
+ @Override
+ public InputStream getConfigurationInputStream(Configuration bootstrapConf,
+ String name) throws YarnException, IOException {
+ if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
+ return new ByteArrayInputStream(
+ ("<configuration>\n" +
+ " <property>\n" +
+ " <name>yarn.resource-types</name>\n" +
+ " <value>a-custom-resource</value>\n" +
+ " </property>\n" +
+ " <property>\n" +
+ " <name>yarn.resource-types.a-custom-resource.units</name>\n" +
+ " <value>G</value>\n" +
+ " </property>\n" +
+ "</configuration>\n").getBytes());
+ } else {
+ return super.getConfigurationInputStream(bootstrapConf, name);
+ }
+ }
+ }
+
+ private static class TestAppender extends AppenderSkeleton {
+
+ private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ protected void append(LoggingEvent arg0) {
+ logEvents.add(arg0);
+ }
+
+ private List<LoggingEvent> getLogEvents() {
+ return logEvents;
+ }
+ }
+
private YARNRunner yarnRunner;
private ResourceMgrDelegate resourceMgrDelegate;
private YarnConfiguration conf;
@@ -143,6 +203,11 @@ public class TestYARNRunner {
private ClientServiceDelegate clientDelegate;
private static final String failString = "Rejected job";
+ @BeforeClass
+ public static void setupBeforeClass() {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ }
+
@Before
public void setUp() throws Exception {
resourceMgrDelegate = mock(ResourceMgrDelegate.class);
@@ -175,6 +240,7 @@ public class TestYARNRunner {
@After
public void cleanup() {
FileUtil.fullyDelete(testWorkDir);
+ ResourceUtils.resetResourceTypes(new Configuration());
}
@Test(timeout=20000)
@@ -884,4 +950,105 @@ public class TestYARNRunner {
.get("hadoop.tmp.dir").equals("testconfdir"));
UserGroupInformation.reset();
}
+
+ @Test
+ public void testCustomAMRMResourceType() throws Exception {
+ initResourceTypes();
+ String customResourceName = "a-custom-resource";
+
+ JobConf jobConf = new JobConf();
+
+ jobConf.setInt(MRJobConfig.MR_AM_RESOURCE_PREFIX +
+ customResourceName, 5);
+ jobConf.setInt(MRJobConfig.MR_AM_CPU_VCORES, 3);
+
+ yarnRunner = new YARNRunner(jobConf);
+
+ submissionContext = buildSubmitContext(yarnRunner, jobConf);
+
+ List<ResourceRequest> resourceRequests =
+ submissionContext.getAMContainerResourceRequests();
+
+ Assert.assertEquals(1, resourceRequests.size());
+ ResourceRequest resourceRequest = resourceRequests.get(0);
+
+ ResourceInformation resourceInformation = resourceRequest.getCapability()
+ .getResourceInformation(customResourceName);
+ Assert.assertEquals("Expecting the default unit (G)",
+ "G", resourceInformation.getUnits());
+ Assert.assertEquals(5L, resourceInformation.getValue());
+ Assert.assertEquals(3, resourceRequest.getCapability().getVirtualCores());
+ }
+
+ @Test
+ public void testAMRMemoryRequest() throws Exception {
+ for (String memoryName : ImmutableList.of(
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+ JobConf jobConf = new JobConf();
+ jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
+
+ yarnRunner = new YARNRunner(jobConf);
+
+ submissionContext = buildSubmitContext(yarnRunner, jobConf);
+
+ List<ResourceRequest> resourceRequests =
+ submissionContext.getAMContainerResourceRequests();
+
+ Assert.assertEquals(1, resourceRequests.size());
+ ResourceRequest resourceRequest = resourceRequests.get(0);
+
+ long memorySize = resourceRequest.getCapability().getMemorySize();
+ Assert.assertEquals(3072, memorySize);
+ }
+ }
+
+ @Test
+ public void testAMRMemoryRequestOverriding() throws Exception {
+ for (String memoryName : ImmutableList.of(
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
+ TestAppender testAppender = new TestAppender();
+ Logger logger = Logger.getLogger(YARNRunner.class);
+ logger.addAppender(testAppender);
+ try {
+ JobConf jobConf = new JobConf();
+ jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
+ jobConf.setInt(MRJobConfig.MR_AM_VMEM_MB, 2048);
+
+ yarnRunner = new YARNRunner(jobConf);
+
+ submissionContext = buildSubmitContext(yarnRunner, jobConf);
+
+ List<ResourceRequest> resourceRequests =
+ submissionContext.getAMContainerResourceRequests();
+
+ Assert.assertEquals(1, resourceRequests.size());
+ ResourceRequest resourceRequest = resourceRequests.get(0);
+
+ long memorySize = resourceRequest.getCapability().getMemorySize();
+ Assert.assertEquals(3072, memorySize);
+ boolean foundLogWarning = false;
+ for (LoggingEvent e : testAppender.getLogEvents()) {
+ if (e.getLevel() == Level.WARN && ("Configuration " +
+ "yarn.app.mapreduce.am.resource." + memoryName + "=3Gi is " +
+ "overriding the yarn.app.mapreduce.am.resource.mb=2048 " +
+ "configuration").equals(e.getMessage())) {
+ foundLogWarning = true;
+ break;
+ }
+ }
+ assertTrue(foundLogWarning);
+ } finally {
+ logger.removeAppender(testAppender);
+ }
+ }
+ }
+
+ private void initResourceTypes() {
+ Configuration configuration = new Configuration();
+ configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+ CustomResourceTypesConfigurationProvider.class.getName());
+ ResourceUtils.resetResourceTypes(configuration);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
index 65eb5a2..3806771 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
@@ -44,7 +44,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
@@ -60,6 +63,8 @@ public class ResourceUtils {
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
private static final String VCORES = ResourceInformation.VCORES.getName();
+ private static final Pattern RESOURCE_REQUEST_VALUE_PATTERN =
+ Pattern.compile("^([0-9]+) ?([a-zA-Z]*)$");
private static volatile boolean initializedResources = false;
private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX =
@@ -564,4 +569,43 @@ public class ResourceUtils {
}
return array;
}
+
+ /**
+ * From a given configuration get all entries representing requested
+ * resources: entries that match the {prefix}{resourceName}={value}[{units}]
+ * pattern.
+ * @param configuration The configuration
+ * @param prefix Keys with this prefix are considered from the configuration
+ * @return The list of requested resources as described by the configuration
+ */
+ public static List<ResourceInformation> getRequestedResourcesFromConfig(
+ Configuration configuration, String prefix) {
+ List<ResourceInformation> result = new ArrayList<>();
+ Map<String, String> customResourcesMap = configuration
+ .getValByRegex("^" + Pattern.quote(prefix) + "[^.]+$");
+ for (Entry<String, String> resource : customResourcesMap.entrySet()) {
+ String resourceName = resource.getKey().substring(prefix.length());
+ Matcher matcher =
+ RESOURCE_REQUEST_VALUE_PATTERN.matcher(resource.getValue());
+ if (!matcher.matches()) {
+ String errorMsg = "Invalid resource request specified for property "
+ + resource.getKey() + ": \"" + resource.getValue()
+ + "\", expected format is: value[ ][units]";
+ LOG.error(errorMsg);
+ throw new IllegalArgumentException(errorMsg);
+ }
+ long value = Long.parseLong(matcher.group(1));
+ String unit = matcher.group(2);
+ if (unit.isEmpty()) {
+ unit = ResourceUtils.getDefaultUnit(resourceName);
+ }
+ ResourceInformation resourceInformation = new ResourceInformation();
+ resourceInformation.setName(resourceName);
+ resourceInformation.setValue(value);
+ resourceInformation.setUnits(unit);
+ result.add(resourceInformation);
+ }
+ return result;
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org