You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2018/01/10 22:46:14 UTC
hadoop git commit: MAPREDUCE-6926. Allow MR jobs to opt out of
oversubscription. Contributed by Haibo Chen.
Repository: hadoop
Updated Branches:
refs/heads/YARN-1011 b646d79d9 -> 43f51bcdb
MAPREDUCE-6926. Allow MR jobs to opt out of oversubscription. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/43f51bcd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/43f51bcd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/43f51bcd
Branch: refs/heads/YARN-1011
Commit: 43f51bcdb92db94549c4e472068629f7971285be
Parents: b646d79
Author: Miklos Szegedi <sz...@apache.org>
Authored: Wed Jan 10 13:21:11 2018 -0800
Committer: Miklos Szegedi <sz...@apache.org>
Committed: Wed Jan 10 13:21:11 2018 -0800
----------------------------------------------------------------------
.../v2/app/rm/RMContainerRequestor.java | 48 ++---
.../v2/app/rm/TestRMContainerAllocator.java | 192 +++++++++++++++++++
.../apache/hadoop/mapreduce/MRJobConfig.java | 6 +
.../src/main/resources/mapred-default.xml | 8 +
4 files changed, 231 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43f51bcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index bb3e1fa..d996690 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -111,6 +111,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final Set<String> blacklistRemovals = Collections
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ private boolean optOutOfOversubscription;
public RMContainerRequestor(ClientService clientService, AppContext context) {
super(clientService, context);
@@ -136,9 +137,11 @@ public abstract class RMContainerRequestor extends RMCommunicator {
public ContainerRequest(ContainerRequestEvent event, Priority priority,
String nodeLabelExpression) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
- event.getRacks(), priority, nodeLabelExpression);
+ event.getRacks(), priority, System.currentTimeMillis(),
+ nodeLabelExpression);
}
+ @VisibleForTesting
public ContainerRequest(ContainerRequestEvent event, Priority priority,
long requestTimeMs) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
@@ -146,13 +149,6 @@ public abstract class RMContainerRequestor extends RMCommunicator {
}
public ContainerRequest(TaskAttemptId attemptID,
- Resource capability, String[] hosts, String[] racks,
- Priority priority, String nodeLabelExpression) {
- this(attemptID, capability, hosts, racks, priority,
- System.currentTimeMillis(), nodeLabelExpression);
- }
-
- public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
Priority priority, long requestTimeMs,String nodeLabelExpression) {
this.attemptID = attemptID;
@@ -186,6 +182,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
+ optOutOfOversubscription = conf.getBoolean(
+ MRJobConfig.MR_OVERSUBSCRIPTION_OPT_OUT,
+ MRJobConfig.DEFAULT_MR_OVERSUBSCRIPTION_OPT_OUT);
+ LOG.info("optOutOfOversubscription is " + optOutOfOversubscription);
if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
throw new YarnRuntimeException("Invalid blacklistDisablePercent: "
+ blacklistDisablePercent
@@ -398,20 +398,20 @@ public abstract class RMContainerRequestor extends RMCommunicator {
for (String host : req.hosts) {
// Data-local
if (!isNodeBlacklisted(host)) {
- addResourceRequest(req.priority, host, req.capability,
+ addGuaranteedResourceRequest(req.priority, host, req.capability,
null);
}
}
// Nothing Rack-local for now
for (String rack : req.racks) {
- addResourceRequest(req.priority, rack, req.capability,
+ addGuaranteedResourceRequest(req.priority, rack, req.capability,
null);
}
// Off-switch
- addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
- req.nodeLabelExpression);
+ addGuaranteedResourceRequest(req.priority, ResourceRequest.ANY,
+ req.capability, req.nodeLabelExpression);
}
protected void decContainerReq(ContainerRequest req) {
@@ -430,18 +430,18 @@ public abstract class RMContainerRequestor extends RMCommunicator {
protected void addOpportunisticResourceRequest(Priority priority,
Resource capability) {
addResourceRequest(priority, ResourceRequest.ANY, capability, null,
- ExecutionType.OPPORTUNISTIC);
+ ExecutionType.OPPORTUNISTIC, true);
}
- private void addResourceRequest(Priority priority, String resourceName,
- Resource capability, String nodeLabelExpression) {
+ private void addGuaranteedResourceRequest(Priority priority,
+ String resourceName, Resource capability, String nodeLabelExpression) {
addResourceRequest(priority, resourceName, capability, nodeLabelExpression,
- ExecutionType.GUARANTEED);
+ ExecutionType.GUARANTEED, optOutOfOversubscription);
}
private void addResourceRequest(Priority priority, String resourceName,
Resource capability, String nodeLabelExpression,
- ExecutionType executionType) {
+ ExecutionType executionType, boolean enforceExecutionType) {
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
@@ -464,8 +464,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
remoteRequest.setNodeLabelExpression(nodeLabelExpression);
- remoteRequest.setExecutionTypeRequest(
- ExecutionTypeRequest.newInstance(executionType, true));
+ remoteRequest.setExecutionTypeRequest(ExecutionTypeRequest.
+ newInstance(executionType, enforceExecutionType));
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
@@ -473,9 +473,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
// Note this down for next interaction with ResourceManager
addResourceRequestToAsk(remoteRequest);
if (LOG.isDebugEnabled()) {
- LOG.debug("addResourceRequest:" + " applicationId="
+ LOG.debug("addGuaranteedResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
+ + " resourceName=" + resourceName + " ExecutionType=" + executionType
+ + " enforceExecutionType=" + enforceExecutionType + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
}
}
@@ -559,8 +560,9 @@ public abstract class RMContainerRequestor extends RMCommunicator {
}
}
String[] hosts = newHosts.toArray(new String[newHosts.size()]);
- ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
- hosts, orig.racks, orig.priority, orig.nodeLabelExpression);
+ ContainerRequest newReq = new ContainerRequest(orig.attemptID,
+ orig.capability, hosts, orig.racks, orig.priority,
+ System.currentTimeMillis(), orig.nodeLabelExpression);
return newReq;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43f51bcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 85e4181..d95793d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -105,6 +105,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -844,6 +846,196 @@ public class TestRMContainerAllocator {
allocator.close();
}
+ /**
+ * Test A MapReduce job can be configured to opt out of oversubscription,
+ * that is, it always wait for guaranteed resources to execute its tasks.
+ * This is done by setting up a MapReduce job with 2 mappers and 1 reducers
+ * and capturing all ResourceRequests sent from the AM to RM, then checking
+ * if all ResourceRequests are guaranteed and their enforceExecutionType is
+ * true.
+ */
+ @Test
+ public void testMapReduceOptingOutOversubscription() throws Exception {
+ List<ResourceRequest> resourceRequests = captureResourceRequests(true);
+
+ for(ResourceRequest resourceRequest : resourceRequests) {
+ ExecutionTypeRequest executionTypeRequest =
+ resourceRequest.getExecutionTypeRequest();
+ if (!executionTypeRequest.equals(ExecutionTypeRequest.newInstance(
+ ExecutionType.GUARANTEED, true))) {
+ Assert.fail("The execution type of ResourceRequest " + resourceRequest +
+ " is not guaranteed or not enforced.");
+ }
+ }
+ }
+
+ /**
+ * Test a MapReduce job can be configured to opt in oversubscription (
+ * true by default). This is done by setting up a MapReduce job with 2
+ * mappers and 1 reducers and capturing all ResourceRequests sent from
+ * the AM to RM, then checking if all ResourceRequests are guaranteed
+ * but their enforceExecutionType is always set to false.
+ */
+ @Test
+ public void testMapReduceOptingInOversubscription() throws Exception {
+ List<ResourceRequest> resourceRequests = captureResourceRequests(false);
+
+ for(ResourceRequest resourceRequest : resourceRequests) {
+ ExecutionTypeRequest executionTypeRequest =
+ resourceRequest.getExecutionTypeRequest();
+ if (!executionTypeRequest.equals(ExecutionTypeRequest.newInstance(
+ ExecutionType.GUARANTEED, false))) {
+ Assert.fail("The execution type of ResourceRequest " + resourceRequest +
+ " is not guaranteed or it is enforced.");
+ }
+ }
+ }
+
+ /**
+ * Set up a mapreduce job with 2 mappers and 1 reducer and return
+ * all ResourceRequests sent from the AM to RM.
+ */
+ private List<ResourceRequest> captureResourceRequests(
+ boolean optOutOfOversubscription) throws Exception {
+ List<ResourceRequest> resourceRequests = new ArrayList<>();
+
+ final Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_OVERSUBSCRIPTION_OPT_OUT,
+ optOutOfOversubscription);
+
+ // start the resource manager
+ final MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // submit an application
+ RMApp rmApp = rm.submitApp(1024);
+ rm.drainEvents();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 11264);
+ amNodeManager.nodeHeartbeat(true);
+ rm.drainEvents();
+
+ final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ rm.drainEvents();
+
+ // start the MR AM and wait until it is in running state
+ MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(
+ appAttemptId, 0), 2, 1, false,
+ this.getClass().getName(), true, 1) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return new DrainDispatcher();
+ }
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context) {
+ return new MyContainerAllocator(rm, appAttemptId, context);
+ };
+ };
+ mrApp.submit(conf);
+
+ Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
+ .getValue();
+ DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
+ MyContainerAllocator allocator = (MyContainerAllocator) mrApp
+ .getContainerAllocator();
+ mrApp.waitForInternalState((JobImpl)job, JobStateInternal.RUNNING);
+ amDispatcher.await();
+
+ // wait until all attempts request for containers
+ for (Task t : job.getTasks().values()) {
+ mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values()
+ .iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
+ }
+ amDispatcher.await();
+
+ // send map resource requests to RM
+ allocator.schedule();
+ rm.drainEvents();
+ for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+ resourceRequests.add(ResourceRequest.newInstance(
+ rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+ rr.getNumContainers(), rr.getRelaxLocality(),
+ rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+ }
+
+ // wait for both map tasks to be running
+ amNodeManager.nodeHeartbeat(true);
+ rm.drainEvents();
+
+ allocator.schedule();
+ rm.drainEvents();
+ for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+ resourceRequests.add(ResourceRequest.newInstance(
+ rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+ rr.getNumContainers(), rr.getRelaxLocality(),
+ rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+ }
+
+ for (Task t : job.getTasks().values()) {
+ if (t.getType() == TaskType.MAP) {
+ mrApp.waitForState(t, TaskState.RUNNING);
+ }
+ }
+
+ // finish both map tasks so that the reduce task can be scheduled
+ Iterator<Task> it = job.getTasks().values().iterator();
+ finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
+ allocator.schedule();
+ rm.drainEvents();
+ for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+ resourceRequests.add(ResourceRequest.newInstance(
+ rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+ rr.getNumContainers(), rr.getRelaxLocality(),
+ rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+ }
+
+ // send the reduce resource requests to RM
+ allocator.schedule();
+ rm.drainEvents();
+ for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+ resourceRequests.add(ResourceRequest.newInstance(
+ rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+ rr.getNumContainers(), rr.getRelaxLocality(),
+ rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+ }
+
+ // wait for the reduce task to be running
+ amNodeManager.nodeHeartbeat(true);
+ rm.drainEvents();
+
+ allocator.schedule();
+ rm.drainEvents();
+ for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+ resourceRequests.add(ResourceRequest.newInstance(
+ rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+ rr.getNumContainers(), rr.getRelaxLocality(),
+ rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+ }
+
+ for (Task t : job.getTasks().values()) {
+ if (t.getType() == TaskType.REDUCE) {
+ mrApp.waitForState(t, TaskState.RUNNING);
+ }
+ }
+
+ // finish the reduce task
+ finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
+ allocator.schedule();
+ rm.drainEvents();
+ for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+ resourceRequests.add(ResourceRequest.newInstance(
+ rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+ rr.getNumContainers(), rr.getRelaxLocality(),
+ rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+ }
+
+ return resourceRequests;
+ }
+
@Test
public void testMapReduceScheduling() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43f51bcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 6acf1bc..569ef9d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -1165,6 +1165,12 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT = 0;
/**
+ * Opt out of YARN oversubscription so that the job always waits for
+ * GUARANTEED resources available in the cluster.
+ */
+ String MR_OVERSUBSCRIPTION_OPT_OUT = "mapreduce.job.oversubscription-opt-out";
+ boolean DEFAULT_MR_OVERSUBSCRIPTION_OPT_OUT = false;
+ /**
* A comma-separated list of properties whose value will be redacted.
*/
String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43f51bcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 1e432ce..00ee7d4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -2065,4 +2065,12 @@
</description>
</property>
+<property>
+ <description>
+ Opts out of YARN oversubscription so that the job always waits for GUARANTEED
+ resources available.
+ </description>
+ <name>mapreduce.job.oversubscription-opt-out</name>
+ <value>false</value>
+</property>
</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org